From ef8340a1c21f96911df68e675f40946ac18287bc Mon Sep 17 00:00:00 2001 From: zhouhao Date: Tue, 18 Feb 2020 19:34:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=9B=91=E6=8E=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../micrometer/MeterRegistryManager.java | 9 +- .../micrometer/MeterRegistrySupplier.java | 3 +- .../MicrometerDeviceGatewayMonitor.java | 11 +- .../MicrometerGatewayMonitorSupplier.java | 4 +- .../MicrometerMessageGatewayMonitor.java | 11 +- .../DeviceGatewayMeasurement.java | 187 +++++++++++------- .../DeviceGatewayMeasurementProvider.java | 12 +- .../micrometer/ElasticNamingConvention.java | 39 ++++ .../micrometer/MeterTimeSeriesMetadata.java | 15 +- .../micrometer/TimeSeriesMeterRegistry.java | 11 +- .../TimeSeriesMeterRegistrySupplier.java | 20 +- .../TimeSeriesRegistryProperties.java | 9 +- .../message/DeviceMessageMeasurement.java | 15 +- .../DeviceMessageMeasurementProvider.java | 3 +- .../DeviceStatusMeasurementProvider.java | 4 +- .../status/DeviceStatusRecordMeasurement.java | 6 +- 16 files changed, 245 insertions(+), 114 deletions(-) create mode 100644 jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/ElasticNamingConvention.java diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistryManager.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistryManager.java index 26bc2331..ac82f73b 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistryManager.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistryManager.java @@ -26,16 +26,15 @@ public class MeterRegistryManager { @Autowired private List suppliers; - private MeterRegistry createMeterRegistry(String metric) { + private MeterRegistry createMeterRegistry(String metric, String... tagKeys) { return new CompositeMeterRegistry(Clock.SYSTEM, suppliers.stream() - .map(supplier -> supplier.getMeterRegistry(metric)) + .map(supplier -> supplier.getMeterRegistry(metric, tagKeys)) .collect(Collectors.toList())); } - public MeterRegistry getMeterRegister(String metric) { - return meterRegistryMap.computeIfAbsent(metric, this::createMeterRegistry); + public MeterRegistry getMeterRegister(String metric, String... tagKeys) { + return meterRegistryMap.computeIfAbsent(metric, _metric -> createMeterRegistry(_metric, tagKeys)); } - } diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistrySupplier.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistrySupplier.java index 0c12507f..bdf7ba07 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistrySupplier.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistrySupplier.java @@ -4,7 +4,6 @@ import io.micrometer.core.instrument.MeterRegistry; public interface MeterRegistrySupplier { - MeterRegistry getMeterRegistry(String metric); - + MeterRegistry getMeterRegistry(String metric, String... tagKeys); } diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/MicrometerDeviceGatewayMonitor.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/MicrometerDeviceGatewayMonitor.java index 39412c69..95063d7f 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/MicrometerDeviceGatewayMonitor.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/MicrometerDeviceGatewayMonitor.java @@ -27,9 +27,8 @@ class MicrometerDeviceGatewayMonitor implements DeviceGatewayMonitor { this.connected = getCounter("connected"); this.rejected = getCounter("rejected"); this.disconnected = getCounter("disconnected"); - this.sentMessage = getCounter("sentMessage"); - this.receivedMessage = getCounter("receivedMessage"); - + this.sentMessage = getCounter("sent_message"); + this.receivedMessage = getCounter("received_message"); } final Counter connected; @@ -69,13 +68,11 @@ class MicrometerDeviceGatewayMonitor implements DeviceGatewayMonitor { @Override public void receivedMessage() { - receivedMessage - .increment(); + receivedMessage.increment(); } @Override public void sentMessage() { - sentMessage - .increment(); + sentMessage.increment(); } } diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/MicrometerGatewayMonitorSupplier.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/MicrometerGatewayMonitorSupplier.java index 94093c3b..3556e292 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/MicrometerGatewayMonitorSupplier.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/MicrometerGatewayMonitorSupplier.java @@ -17,11 +17,11 @@ public class MicrometerGatewayMonitorSupplier implements MessageGatewayMonitorSu @Override public MessageGatewayMonitor getMessageGatewayMonitor(String id, String... tags) { - return new MicrometerMessageGatewayMonitor(meterRegistryManager.getMeterRegister(GatewayTimeSeriesMetric.messageGatewayMetric), id, tags); + return new MicrometerMessageGatewayMonitor(meterRegistryManager.getMeterRegister(GatewayTimeSeriesMetric.messageGatewayMetric, "target", "connector"), id, tags); } @Override public DeviceGatewayMonitor getDeviceGatewayMonitor(String id, String... tags) { - return new MicrometerDeviceGatewayMonitor(meterRegistryManager.getMeterRegister(GatewayTimeSeriesMetric.deviceGatewayMetric), id, tags); + return new MicrometerDeviceGatewayMonitor(meterRegistryManager.getMeterRegister(GatewayTimeSeriesMetric.deviceGatewayMetric, "target"), id, tags); } } diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/MicrometerMessageGatewayMonitor.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/MicrometerMessageGatewayMonitor.java index 5b436f9c..362b30f8 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/MicrometerMessageGatewayMonitor.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/MicrometerMessageGatewayMonitor.java @@ -25,11 +25,11 @@ class MicrometerMessageGatewayMonitor implements MessageGatewayMonitor { .tag("target", "sessionNumber") .register(registry); - this.acceptedSession=getCounter("acceptedSession"); - this.closedSession=getCounter("closedSession"); - this.subscribed=getCounter("subscribed"); - this.unsubscribed=getCounter("unsubscribed"); - this.acceptMessage=getCounter("acceptMessage"); + this.acceptedSession = getCounter("accepted_session"); + this.closedSession = getCounter("closed_session"); + this.subscribed = getCounter("subscribed"); + this.unsubscribed = getCounter("unsubscribed"); + this.acceptMessage = getCounter("accept_message"); } @@ -52,6 +52,7 @@ class MicrometerMessageGatewayMonitor implements MessageGatewayMonitor { .tag("target", target) .register(registry); } + @Override public void acceptedSession() { acceptedSession diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/measurements/DeviceGatewayMeasurement.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/measurements/DeviceGatewayMeasurement.java index 03fb1989..e13a8747 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/measurements/DeviceGatewayMeasurement.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/measurements/DeviceGatewayMeasurement.java @@ -1,5 +1,7 @@ package org.jetlinks.community.gateway.monitor.measurements; +import org.hswebframework.web.api.crud.entity.QueryParamEntity; +import org.jetlinks.community.timeseries.query.Aggregation; import org.jetlinks.core.metadata.ConfigMetadata; import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.DefaultConfigMetadata; @@ -21,82 +23,133 @@ import java.util.Date; class DeviceGatewayMeasurement extends StaticMeasurement { - static MeasurementDefinition definition = new MeasurementDefinition() { - @Override - public String getId() { - return "device-gateway-message-quantity"; + private TimeSeriesManager timeSeriesManager; + + private String type; + + private Aggregation defaultAgg; + + private String property; + + public DeviceGatewayMeasurement(MeasurementDefinition definition, + String property, + Aggregation defaultAgg, + TimeSeriesManager timeSeriesManager) { + super(definition); + this.timeSeriesManager = timeSeriesManager; + this.defaultAgg = defaultAgg; + this.type = definition.getId(); + this.property = property; + addDimension(new AggDeviceStateDimension()); + addDimension(new HistoryDimension()); } - @Override - public String getName() { - return "设备网关数据量"; - } - }; + static ConfigMetadata historyConfigMetadata = new DefaultConfigMetadata() + .add("gatewayId", "网关", "", new StringType()) + .add("time", "周期", "例如: 1h,10m,30s", new StringType()) + .add("format", "时间格式", "如: MM-dd:HH", new StringType()) + .add("limit", "最大数据量", "", new IntType()) + .add("from", "时间从", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss")) + .add("to", "时间至", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss")); - private TimeSeriesManager timeSeriesManager; - public DeviceGatewayMeasurement(TimeSeriesManager timeSeriesManager) { - super(definition); - this.timeSeriesManager = timeSeriesManager; - addDimension(new AggDeviceStateDimension()); - } + class HistoryDimension implements MeasurementDimension { - static ConfigMetadata historyConfigMetadata = new DefaultConfigMetadata() - .add("time", "周期", "例如: 1h,10m,30s", new StringType()) - .add("format", "时间格式", "如: MM-dd:HH", new StringType()) - .add("type", "类型", "", new EnumType() - .addElement(EnumType.Element.of("connected", "创建连接数")) - .addElement(EnumType.Element.of("rejected", "拒绝连接数")) - .addElement(EnumType.Element.of("disconnected", "断开连接数")) - .addElement(EnumType.Element.of("receivedMessage", "接收消息数")) - .addElement(EnumType.Element.of("sentMessage", "发送消息数")) - ) - .add("limit", "最大数据量", "", new IntType()) - .add("from", "时间从", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss")) - .add("to", "时间至", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss")); + @Override + public DimensionDefinition getDefinition() { + return CommonDimensionDefinition.history; + } - static DataType historyValueType = new IntType(); + @Override + public DataType getValueType() { + return new IntType(); + } - class AggDeviceStateDimension implements MeasurementDimension { + @Override + public ConfigMetadata getParams() { + return historyConfigMetadata; + } - @Override - public DimensionDefinition getDefinition() { - return CommonDimensionDefinition.agg; + @Override + public boolean isRealTime() { + return false; + } + + @Override + public Flux getValue(MeasurementParameter parameter) { + return QueryParamEntity.newQuery() + .where("target", type) + .is("name", parameter.getString("gatewayId").orElse(null)) + .doPaging(0, parameter.getInt("limit").orElse(1)) + .between("timestamp", + parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())), + parameter.getDate("to").orElseGet(Date::new) + ) + .execute(timeSeriesManager.getService(GatewayTimeSeriesMetric.deviceGatewayMetric())::query) + .map(data -> SimpleMeasurementValue.of( + data.getInt(property).orElse(0), + data.getTimestamp())); + } } - @Override - public DataType getValueType() { - return historyValueType; + + static ConfigMetadata aggConfigMetadata = new DefaultConfigMetadata() + .add("gatewayId", "网关", "", new StringType()) + .add("time", "周期", "例如: 1h,10m,30s", new StringType()) + .add("format", "时间格式", "如: MM-dd:HH", new StringType()) + .add("agg", "聚合方式", "", new EnumType() + .addElement(EnumType.Element.of("SUM", "总和")) + .addElement(EnumType.Element.of("MAX", "最大值")) + .addElement(EnumType.Element.of("MIN", "最小值")) + .addElement(EnumType.Element.of("AVG", "平局值")) + ) + .add("limit", "最大数据量", "", new IntType()) + .add("from", "时间从", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss")) + .add("to", "时间至", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss")); + + + //聚合数据 + class AggDeviceStateDimension implements MeasurementDimension { + + @Override + public DimensionDefinition getDefinition() { + return CommonDimensionDefinition.agg; + } + + @Override + public DataType getValueType() { + return new IntType(); + } + + @Override + public ConfigMetadata getParams() { + return aggConfigMetadata; + } + + @Override + public boolean isRealTime() { + return false; + } + + @Override + public Flux getValue(MeasurementParameter parameter) { + + return AggregationQueryParam.of() + .agg(property, parameter.get("agg", Aggregation.class).orElse(defaultAgg)) + .groupBy(parameter.getDuration("time").orElse(Duration.ofHours(1)), + "time", + parameter.getString("format").orElse("MM-dd:HH")) + .filter(query -> query + .where("target", type) + .is("name", parameter.getString("gatewayId").orElse(null))) + .limit(parameter.getInt("limit").orElse(1)) + .from(parameter.getDate("from").orElseGet(()->Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant()))) + .to(parameter.getDate("to").orElse(new Date())) + .execute(timeSeriesManager.getService(GatewayTimeSeriesMetric.deviceGatewayMetric())::aggregation) + .map(data -> SimpleMeasurementValue.of( + data.getInt(property).orElse(0), + data.getString("time").orElse(""), + System.currentTimeMillis())); + } } - - @Override - public ConfigMetadata getParams() { - return historyConfigMetadata; - } - - @Override - public boolean isRealTime() { - return false; - } - - @Override - public Flux getValue(MeasurementParameter parameter) { - - return AggregationQueryParam.of() - .sum("count") - .groupBy(parameter.getDuration("time").orElse(Duration.ofHours(1)), - "time", - parameter.getString("format").orElse("MM-dd:HH")) - .filter(query -> query.where("target", parameter.getString("type").orElse("connected"))) - .limit(parameter.getInt("limit").orElse(1)) - .from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant()))) - .to(parameter.getDate("to").orElse(new Date())) - .execute(timeSeriesManager.getService(GatewayTimeSeriesMetric.deviceGatewayMetric())::aggregation) - .map(data -> SimpleMeasurementValue.of( - data.getInt("count").orElse(0), - data.getString("time").orElse(""), - System.currentTimeMillis())); - } - } - } diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/measurements/DeviceGatewayMeasurementProvider.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/measurements/DeviceGatewayMeasurementProvider.java index e801b8d8..b560f8ef 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/measurements/DeviceGatewayMeasurementProvider.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/monitor/measurements/DeviceGatewayMeasurementProvider.java @@ -1,15 +1,25 @@ package org.jetlinks.community.gateway.monitor.measurements; +import org.jetlinks.community.dashboard.MeasurementDefinition; import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider; import org.jetlinks.community.timeseries.TimeSeriesManager; +import org.jetlinks.community.timeseries.query.Aggregation; import org.springframework.stereotype.Component; +import static org.jetlinks.community.dashboard.MeasurementDefinition.*; + @Component public class DeviceGatewayMeasurementProvider extends StaticMeasurementProvider { public DeviceGatewayMeasurementProvider(TimeSeriesManager timeSeriesManager) { super(GatewayDashboardDefinition.gatewayMonitor, GatewayObjectDefinition.deviceGateway); - addMeasurement(new DeviceGatewayMeasurement(timeSeriesManager)); + addMeasurement(new DeviceGatewayMeasurement(of("connection", "连接数"), "value", Aggregation.MAX, timeSeriesManager)); + + addMeasurement(new DeviceGatewayMeasurement(of("connected", "创建连接数"), "count", Aggregation.SUM, timeSeriesManager)); + addMeasurement(new DeviceGatewayMeasurement(of("rejected", "拒绝连接数"), "count", Aggregation.SUM, timeSeriesManager)); + addMeasurement(new DeviceGatewayMeasurement(of("disconnected", "断开连接数"), "count", Aggregation.SUM, timeSeriesManager)); + addMeasurement(new DeviceGatewayMeasurement(of("received_message", "接收消息数"), "count", Aggregation.SUM, timeSeriesManager)); + addMeasurement(new DeviceGatewayMeasurement(of("sent_message", "发送消息数"), "count", Aggregation.SUM, timeSeriesManager)); } } diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/ElasticNamingConvention.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/ElasticNamingConvention.java new file mode 100644 index 00000000..d13aefdb --- /dev/null +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/ElasticNamingConvention.java @@ -0,0 +1,39 @@ +package org.jetlinks.community.timeseries.micrometer; + +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.config.NamingConvention; + +import java.util.regex.Pattern; + +public class ElasticNamingConvention implements NamingConvention { + + private static final Pattern FIRST_UNDERSCORE_PATTERN = Pattern.compile("^_+"); + + private final NamingConvention delegate; + + public ElasticNamingConvention() { + this(NamingConvention.snakeCase); + } + + public ElasticNamingConvention(NamingConvention delegate) { + this.delegate = delegate; + } + + @Override + public String name(String name, Meter.Type type, String baseUnit) { + return delegate.name(name, type, baseUnit); + } + + @Override + public String tagKey(String key) { + if (key.equals("name")) { + key = "name.tag"; + } else if (key.equals("type")) { + key = "type.tag"; + } else if (key.startsWith("_")) { + // Fields that start with _ are considered reserved and ignored by Kibana. See https://github.com/elastic/kibana/issues/2551 + key = FIRST_UNDERSCORE_PATTERN.matcher(key).replaceFirst(""); + } + return delegate.tagKey(key); + } +} \ No newline at end of file diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/MeterTimeSeriesMetadata.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/MeterTimeSeriesMetadata.java index d2a7df12..a94e5def 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/MeterTimeSeriesMetadata.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/MeterTimeSeriesMetadata.java @@ -17,6 +17,8 @@ class MeterTimeSeriesMetadata implements TimeSeriesMetadata { @Getter private TimeSeriesMetric metric; + @Getter + private List keys; static final List properties = new ArrayList<>(); @@ -110,12 +112,19 @@ class MeterTimeSeriesMetadata implements TimeSeriesMetadata { property.setValueType(new DoubleType()); properties.add(property); } - - } @Override public List getProperties() { - return new ArrayList<>(properties); + + List metadata = new ArrayList<>(properties); + for (String key : keys) { + SimplePropertyMetadata property = new SimplePropertyMetadata(); + property.setId(key); + property.setName(key); + property.setValueType(new StringType()); + metadata.add(property); + } + return metadata; } } diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistry.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistry.java index 138cbe7e..a278efc0 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistry.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistry.java @@ -9,6 +9,9 @@ import org.jetlinks.community.timeseries.TimeSeriesMetric; import reactor.core.publisher.Flux; import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -24,22 +27,26 @@ public class TimeSeriesMeterRegistry extends StepMeterRegistry { private Map customTags; + private List keys = new ArrayList<>(); public TimeSeriesMeterRegistry(TimeSeriesManager timeSeriesManager, TimeSeriesMetric metric, TimeSeriesRegistryProperties config, - Map customTags) { + Map customTags,String ...tagKeys) { super(new TimeSeriesPropertiesPropertiesConfigAdapter(config), Clock.SYSTEM); this.timeSeriesManager = timeSeriesManager; this.metric = metric; this.customTags = customTags; + keys.addAll(customTags.keySet()); + keys.addAll(Arrays.asList(tagKeys)); + keys.addAll(config.getCustomTagKeys()); start(DEFAULT_THREAD_FACTORY); } @Override public void start(ThreadFactory threadFactory) { super.start(threadFactory); - timeSeriesManager.registerMetadata(MeterTimeSeriesMetadata.of(metric)) + timeSeriesManager.registerMetadata(MeterTimeSeriesMetadata.of(metric,keys)) .doOnError(e -> log.error("register metric metadata error", e)) .subscribe((r) -> log.error("register metric [{}] metadata success", metric.getId())); } diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistrySupplier.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistrySupplier.java index b649dcff..bea178d8 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistrySupplier.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistrySupplier.java @@ -1,6 +1,7 @@ package org.jetlinks.community.timeseries.micrometer; import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.config.NamingConvention; import lombok.Getter; import lombok.Setter; import org.jetlinks.community.micrometer.MeterRegistrySupplier; @@ -14,6 +15,16 @@ public class TimeSeriesMeterRegistrySupplier implements MeterRegistrySupplier { private final TimeSeriesManager timeSeriesManager; + @Getter + @Setter + private String naming = "elastic"; + + static Map namingSupports = new HashMap<>(); + + static { + namingSupports.put("elastic", new ElasticNamingConvention()); + } + @Getter @Setter private Map tags = new HashMap<>(); @@ -28,8 +39,13 @@ public class TimeSeriesMeterRegistrySupplier implements MeterRegistrySupplier { } @Override - public MeterRegistry getMeterRegistry(String metric) { - return new TimeSeriesMeterRegistry(timeSeriesManager, TimeSeriesMetric.of(metric), metrics.getOrDefault(metric, metrics.get("default")), tags); + public MeterRegistry getMeterRegistry(String metric, String... tagKeys) { + TimeSeriesMeterRegistry registry = new TimeSeriesMeterRegistry(timeSeriesManager, + TimeSeriesMetric.of(metric), + metrics.getOrDefault(metric, metrics.get("default")), + tags, tagKeys); + registry.config().namingConvention(namingSupports.get(naming)); + return registry; } } diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesRegistryProperties.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesRegistryProperties.java index 89f9ad13..3cb5e9b2 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesRegistryProperties.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesRegistryProperties.java @@ -1,7 +1,14 @@ package org.jetlinks.community.timeseries.micrometer; +import lombok.Getter; +import lombok.Setter; import org.springframework.boot.actuate.autoconfigure.metrics.export.properties.StepRegistryProperties; -public class TimeSeriesRegistryProperties extends StepRegistryProperties { +import java.util.ArrayList; +import java.util.List; +@Getter +@Setter +public class TimeSeriesRegistryProperties extends StepRegistryProperties { + private List customTagKeys = new ArrayList<>(); } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java index b5052df1..69311b30 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java @@ -25,17 +25,7 @@ class DeviceMessageMeasurement extends StaticMeasurement { private TimeSeriesManager timeSeriesManager; - static MeasurementDefinition definition = new MeasurementDefinition() { - @Override - public String getId() { - return "quantity"; - } - - @Override - public String getName() { - return "设备消息量"; - } - }; + static MeasurementDefinition definition = MeasurementDefinition.of("quantity", "设备消息量"); public DeviceMessageMeasurement(MessageGateway messageGateway, TimeSeriesManager timeSeriesManager) { super(definition); @@ -86,9 +76,9 @@ class DeviceMessageMeasurement extends StaticMeasurement { static ConfigMetadata historyConfigMetadata = new DefaultConfigMetadata() + .add("productId", "设备型号", "", new StringType()) .add("time", "周期", "例如: 1h,10m,30s", new StringType()) .add("format", "时间格式", "如: MM-dd:HH", new StringType()) - .add("productId", "设备型号", "", new StringType()) .add("msgType", "消息类型", "", new StringType()) .add("limit", "最大数据量", "", new IntType()) .add("from", "时间从", "", new DateTimeType()) @@ -129,7 +119,6 @@ class DeviceMessageMeasurement extends StaticMeasurement { query.where("name", "message-count") .is("productId", parameter.getString("productId").orElse(null)) .is("msgType", parameter.getString("msgType").orElse(null)) - ) .limit(parameter.getInt("limit").orElse(1)) .from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant()))) diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurementProvider.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurementProvider.java index ef5b256f..945bc686 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurementProvider.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurementProvider.java @@ -26,7 +26,8 @@ public class DeviceMessageMeasurementProvider extends StaticMeasurementProvider addMeasurement(new DeviceMessageMeasurement(messageGateway, timeSeriesManager)); - MeterRegistry registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId()); + MeterRegistry registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId(), + "target", "msgType", "productId"); //订阅设备消息,用于统计设备消息量 messageGateway.subscribe("/device/*/message/**") diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java index 58affa25..20680d0a 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java @@ -32,8 +32,8 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider { addMeasurement(new DeviceStatusRecordMeasurement(instanceService, timeSeriesManager)); - MeterRegistry registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId()); - + MeterRegistry registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId(), + "target", "msgType", "productId"); Map productCounts = new ConcurrentHashMap<>(); Function counterAdder = productId -> diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusRecordMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusRecordMeasurement.java index 1639f5c0..954b65a7 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusRecordMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusRecordMeasurement.java @@ -42,6 +42,7 @@ class DeviceStatusRecordMeasurement } static ConfigMetadata aggConfigMetadata = new DefaultConfigMetadata() + .add("productId", "设备型号", "", new StringType()) .add("time", "周期", "例如: 1h,10m,30s", new StringType()) .add("format", "时间格式", "如: MM-dd:HH", new StringType()) .add("limit", "最大数据量", "", new IntType()) @@ -76,7 +77,10 @@ class DeviceStatusRecordMeasurement public Flux getValue(MeasurementParameter parameter) { return AggregationQueryParam.of() .max("value") - .filter(query -> query.where("name", "online-count")) + .filter(query -> + query.where("name", "online-count") + .is("productId", parameter.getString("productId").orElse(null)) + ) .from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-30).atZone(ZoneId.systemDefault()).toInstant()))) .to(parameter.getDate("to").orElse(new Date())) .groupBy(parameter.getDuration("time").orElse(Duration.ofDays(1)),