优化监控

This commit is contained in:
zhouhao 2020-02-18 19:34:43 +08:00
parent 2d48f46e65
commit ef8340a1c2
16 changed files with 245 additions and 114 deletions

View File

@ -26,16 +26,15 @@ public class MeterRegistryManager {
@Autowired @Autowired
private List<MeterRegistrySupplier> suppliers; private List<MeterRegistrySupplier> suppliers;
private MeterRegistry createMeterRegistry(String metric) { private MeterRegistry createMeterRegistry(String metric, String... tagKeys) {
return new CompositeMeterRegistry(Clock.SYSTEM, return new CompositeMeterRegistry(Clock.SYSTEM,
suppliers.stream() suppliers.stream()
.map(supplier -> supplier.getMeterRegistry(metric)) .map(supplier -> supplier.getMeterRegistry(metric, tagKeys))
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }
public MeterRegistry getMeterRegister(String metric) { public MeterRegistry getMeterRegister(String metric, String... tagKeys) {
return meterRegistryMap.computeIfAbsent(metric, this::createMeterRegistry); return meterRegistryMap.computeIfAbsent(metric, _metric -> createMeterRegistry(_metric, tagKeys));
} }
} }

View File

@ -4,7 +4,6 @@ import io.micrometer.core.instrument.MeterRegistry;
public interface MeterRegistrySupplier { public interface MeterRegistrySupplier {
MeterRegistry getMeterRegistry(String metric); MeterRegistry getMeterRegistry(String metric, String... tagKeys);
} }

View File

@ -27,9 +27,8 @@ class MicrometerDeviceGatewayMonitor implements DeviceGatewayMonitor {
this.connected = getCounter("connected"); this.connected = getCounter("connected");
this.rejected = getCounter("rejected"); this.rejected = getCounter("rejected");
this.disconnected = getCounter("disconnected"); this.disconnected = getCounter("disconnected");
this.sentMessage = getCounter("sentMessage"); this.sentMessage = getCounter("sent_message");
this.receivedMessage = getCounter("receivedMessage"); this.receivedMessage = getCounter("received_message");
} }
final Counter connected; final Counter connected;
@ -69,13 +68,11 @@ class MicrometerDeviceGatewayMonitor implements DeviceGatewayMonitor {
@Override @Override
public void receivedMessage() { public void receivedMessage() {
receivedMessage receivedMessage.increment();
.increment();
} }
@Override @Override
public void sentMessage() { public void sentMessage() {
sentMessage sentMessage.increment();
.increment();
} }
} }

View File

@ -17,11 +17,11 @@ public class MicrometerGatewayMonitorSupplier implements MessageGatewayMonitorSu
@Override @Override
public MessageGatewayMonitor getMessageGatewayMonitor(String id, String... tags) { public MessageGatewayMonitor getMessageGatewayMonitor(String id, String... tags) {
return new MicrometerMessageGatewayMonitor(meterRegistryManager.getMeterRegister(GatewayTimeSeriesMetric.messageGatewayMetric), id, tags); return new MicrometerMessageGatewayMonitor(meterRegistryManager.getMeterRegister(GatewayTimeSeriesMetric.messageGatewayMetric, "target", "connector"), id, tags);
} }
@Override @Override
public DeviceGatewayMonitor getDeviceGatewayMonitor(String id, String... tags) { public DeviceGatewayMonitor getDeviceGatewayMonitor(String id, String... tags) {
return new MicrometerDeviceGatewayMonitor(meterRegistryManager.getMeterRegister(GatewayTimeSeriesMetric.deviceGatewayMetric), id, tags); return new MicrometerDeviceGatewayMonitor(meterRegistryManager.getMeterRegister(GatewayTimeSeriesMetric.deviceGatewayMetric, "target"), id, tags);
} }
} }

View File

@ -25,11 +25,11 @@ class MicrometerMessageGatewayMonitor implements MessageGatewayMonitor {
.tag("target", "sessionNumber") .tag("target", "sessionNumber")
.register(registry); .register(registry);
this.acceptedSession=getCounter("acceptedSession"); this.acceptedSession = getCounter("accepted_session");
this.closedSession=getCounter("closedSession"); this.closedSession = getCounter("closed_session");
this.subscribed=getCounter("subscribed"); this.subscribed = getCounter("subscribed");
this.unsubscribed=getCounter("unsubscribed"); this.unsubscribed = getCounter("unsubscribed");
this.acceptMessage=getCounter("acceptMessage"); this.acceptMessage = getCounter("accept_message");
} }
@ -52,6 +52,7 @@ class MicrometerMessageGatewayMonitor implements MessageGatewayMonitor {
.tag("target", target) .tag("target", target)
.register(registry); .register(registry);
} }
@Override @Override
public void acceptedSession() { public void acceptedSession() {
acceptedSession acceptedSession

View File

@ -1,5 +1,7 @@
package org.jetlinks.community.gateway.monitor.measurements; package org.jetlinks.community.gateway.monitor.measurements;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.timeseries.query.Aggregation;
import org.jetlinks.core.metadata.ConfigMetadata; import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DefaultConfigMetadata; import org.jetlinks.core.metadata.DefaultConfigMetadata;
@ -21,82 +23,133 @@ import java.util.Date;
class DeviceGatewayMeasurement extends StaticMeasurement { class DeviceGatewayMeasurement extends StaticMeasurement {
static MeasurementDefinition definition = new MeasurementDefinition() { private TimeSeriesManager timeSeriesManager;
@Override
public String getId() { private String type;
return "device-gateway-message-quantity";
private Aggregation defaultAgg;
private String property;
public DeviceGatewayMeasurement(MeasurementDefinition definition,
String property,
Aggregation defaultAgg,
TimeSeriesManager timeSeriesManager) {
super(definition);
this.timeSeriesManager = timeSeriesManager;
this.defaultAgg = defaultAgg;
this.type = definition.getId();
this.property = property;
addDimension(new AggDeviceStateDimension());
addDimension(new HistoryDimension());
} }
@Override static ConfigMetadata historyConfigMetadata = new DefaultConfigMetadata()
public String getName() { .add("gatewayId", "网关", "", new StringType())
return "设备网关数据量"; .add("time", "周期", "例如: 1h,10m,30s", new StringType())
} .add("format", "时间格式", "如: MM-dd:HH", new StringType())
}; .add("limit", "最大数据量", "", new IntType())
.add("from", "时间从", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss"))
.add("to", "时间至", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss"));
private TimeSeriesManager timeSeriesManager;
public DeviceGatewayMeasurement(TimeSeriesManager timeSeriesManager) { class HistoryDimension implements MeasurementDimension {
super(definition);
this.timeSeriesManager = timeSeriesManager;
addDimension(new AggDeviceStateDimension());
}
static ConfigMetadata historyConfigMetadata = new DefaultConfigMetadata() @Override
.add("time", "周期", "例如: 1h,10m,30s", new StringType()) public DimensionDefinition getDefinition() {
.add("format", "时间格式", "如: MM-dd:HH", new StringType()) return CommonDimensionDefinition.history;
.add("type", "类型", "", new EnumType() }
.addElement(EnumType.Element.of("connected", "创建连接数"))
.addElement(EnumType.Element.of("rejected", "拒绝连接数"))
.addElement(EnumType.Element.of("disconnected", "断开连接数"))
.addElement(EnumType.Element.of("receivedMessage", "接收消息数"))
.addElement(EnumType.Element.of("sentMessage", "发送消息数"))
)
.add("limit", "最大数据量", "", new IntType())
.add("from", "时间从", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss"))
.add("to", "时间至", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss"));
static DataType historyValueType = new IntType(); @Override
public DataType getValueType() {
return new IntType();
}
class AggDeviceStateDimension implements MeasurementDimension { @Override
public ConfigMetadata getParams() {
return historyConfigMetadata;
}
@Override @Override
public DimensionDefinition getDefinition() { public boolean isRealTime() {
return CommonDimensionDefinition.agg; return false;
}
@Override
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
return QueryParamEntity.newQuery()
.where("target", type)
.is("name", parameter.getString("gatewayId").orElse(null))
.doPaging(0, parameter.getInt("limit").orElse(1))
.between("timestamp",
parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())),
parameter.getDate("to").orElseGet(Date::new)
)
.execute(timeSeriesManager.getService(GatewayTimeSeriesMetric.deviceGatewayMetric())::query)
.map(data -> SimpleMeasurementValue.of(
data.getInt(property).orElse(0),
data.getTimestamp()));
}
} }
@Override
public DataType getValueType() { static ConfigMetadata aggConfigMetadata = new DefaultConfigMetadata()
return historyValueType; .add("gatewayId", "网关", "", new StringType())
.add("time", "周期", "例如: 1h,10m,30s", new StringType())
.add("format", "时间格式", "如: MM-dd:HH", new StringType())
.add("agg", "聚合方式", "", new EnumType()
.addElement(EnumType.Element.of("SUM", "总和"))
.addElement(EnumType.Element.of("MAX", "最大值"))
.addElement(EnumType.Element.of("MIN", "最小值"))
.addElement(EnumType.Element.of("AVG", "平局值"))
)
.add("limit", "最大数据量", "", new IntType())
.add("from", "时间从", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss"))
.add("to", "时间至", "", new DateTimeType().format("yyyy-MM-dd HH:mm:ss"));
//聚合数据
class AggDeviceStateDimension implements MeasurementDimension {
@Override
public DimensionDefinition getDefinition() {
return CommonDimensionDefinition.agg;
}
@Override
public DataType getValueType() {
return new IntType();
}
@Override
public ConfigMetadata getParams() {
return aggConfigMetadata;
}
@Override
public boolean isRealTime() {
return false;
}
@Override
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
return AggregationQueryParam.of()
.agg(property, parameter.get("agg", Aggregation.class).orElse(defaultAgg))
.groupBy(parameter.getDuration("time").orElse(Duration.ofHours(1)),
"time",
parameter.getString("format").orElse("MM-dd:HH"))
.filter(query -> query
.where("target", type)
.is("name", parameter.getString("gatewayId").orElse(null)))
.limit(parameter.getInt("limit").orElse(1))
.from(parameter.getDate("from").orElseGet(()->Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
.to(parameter.getDate("to").orElse(new Date()))
.execute(timeSeriesManager.getService(GatewayTimeSeriesMetric.deviceGatewayMetric())::aggregation)
.map(data -> SimpleMeasurementValue.of(
data.getInt(property).orElse(0),
data.getString("time").orElse(""),
System.currentTimeMillis()));
}
} }
@Override
public ConfigMetadata getParams() {
return historyConfigMetadata;
}
@Override
public boolean isRealTime() {
return false;
}
@Override
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
return AggregationQueryParam.of()
.sum("count")
.groupBy(parameter.getDuration("time").orElse(Duration.ofHours(1)),
"time",
parameter.getString("format").orElse("MM-dd:HH"))
.filter(query -> query.where("target", parameter.getString("type").orElse("connected")))
.limit(parameter.getInt("limit").orElse(1))
.from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
.to(parameter.getDate("to").orElse(new Date()))
.execute(timeSeriesManager.getService(GatewayTimeSeriesMetric.deviceGatewayMetric())::aggregation)
.map(data -> SimpleMeasurementValue.of(
data.getInt("count").orElse(0),
data.getString("time").orElse(""),
System.currentTimeMillis()));
}
}
} }

View File

@ -1,15 +1,25 @@
package org.jetlinks.community.gateway.monitor.measurements; package org.jetlinks.community.gateway.monitor.measurements;
import org.jetlinks.community.dashboard.MeasurementDefinition;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider; import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.jetlinks.community.timeseries.TimeSeriesManager; import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.Aggregation;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import static org.jetlinks.community.dashboard.MeasurementDefinition.*;
@Component @Component
public class DeviceGatewayMeasurementProvider extends StaticMeasurementProvider { public class DeviceGatewayMeasurementProvider extends StaticMeasurementProvider {
public DeviceGatewayMeasurementProvider(TimeSeriesManager timeSeriesManager) { public DeviceGatewayMeasurementProvider(TimeSeriesManager timeSeriesManager) {
super(GatewayDashboardDefinition.gatewayMonitor, GatewayObjectDefinition.deviceGateway); super(GatewayDashboardDefinition.gatewayMonitor, GatewayObjectDefinition.deviceGateway);
addMeasurement(new DeviceGatewayMeasurement(timeSeriesManager)); addMeasurement(new DeviceGatewayMeasurement(of("connection", "连接数"), "value", Aggregation.MAX, timeSeriesManager));
addMeasurement(new DeviceGatewayMeasurement(of("connected", "创建连接数"), "count", Aggregation.SUM, timeSeriesManager));
addMeasurement(new DeviceGatewayMeasurement(of("rejected", "拒绝连接数"), "count", Aggregation.SUM, timeSeriesManager));
addMeasurement(new DeviceGatewayMeasurement(of("disconnected", "断开连接数"), "count", Aggregation.SUM, timeSeriesManager));
addMeasurement(new DeviceGatewayMeasurement(of("received_message", "接收消息数"), "count", Aggregation.SUM, timeSeriesManager));
addMeasurement(new DeviceGatewayMeasurement(of("sent_message", "发送消息数"), "count", Aggregation.SUM, timeSeriesManager));
} }
} }

View File

@ -0,0 +1,39 @@
package org.jetlinks.community.timeseries.micrometer;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.config.NamingConvention;
import java.util.regex.Pattern;
public class ElasticNamingConvention implements NamingConvention {
private static final Pattern FIRST_UNDERSCORE_PATTERN = Pattern.compile("^_+");
private final NamingConvention delegate;
public ElasticNamingConvention() {
this(NamingConvention.snakeCase);
}
public ElasticNamingConvention(NamingConvention delegate) {
this.delegate = delegate;
}
@Override
public String name(String name, Meter.Type type, String baseUnit) {
return delegate.name(name, type, baseUnit);
}
@Override
public String tagKey(String key) {
if (key.equals("name")) {
key = "name.tag";
} else if (key.equals("type")) {
key = "type.tag";
} else if (key.startsWith("_")) {
// Fields that start with _ are considered reserved and ignored by Kibana. See https://github.com/elastic/kibana/issues/2551
key = FIRST_UNDERSCORE_PATTERN.matcher(key).replaceFirst("");
}
return delegate.tagKey(key);
}
}

View File

@ -17,6 +17,8 @@ class MeterTimeSeriesMetadata implements TimeSeriesMetadata {
@Getter @Getter
private TimeSeriesMetric metric; private TimeSeriesMetric metric;
@Getter
private List<String> keys;
static final List<PropertyMetadata> properties = new ArrayList<>(); static final List<PropertyMetadata> properties = new ArrayList<>();
@ -110,12 +112,19 @@ class MeterTimeSeriesMetadata implements TimeSeriesMetadata {
property.setValueType(new DoubleType()); property.setValueType(new DoubleType());
properties.add(property); properties.add(property);
} }
} }
@Override @Override
public List<PropertyMetadata> getProperties() { public List<PropertyMetadata> getProperties() {
return new ArrayList<>(properties);
List<PropertyMetadata> metadata = new ArrayList<>(properties);
for (String key : keys) {
SimplePropertyMetadata property = new SimplePropertyMetadata();
property.setId(key);
property.setName(key);
property.setValueType(new StringType());
metadata.add(property);
}
return metadata;
} }
} }

View File

@ -9,6 +9,9 @@ import org.jetlinks.community.timeseries.TimeSeriesMetric;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -24,22 +27,26 @@ public class TimeSeriesMeterRegistry extends StepMeterRegistry {
private Map<String, String> customTags; private Map<String, String> customTags;
private List<String> keys = new ArrayList<>();
public TimeSeriesMeterRegistry(TimeSeriesManager timeSeriesManager, public TimeSeriesMeterRegistry(TimeSeriesManager timeSeriesManager,
TimeSeriesMetric metric, TimeSeriesMetric metric,
TimeSeriesRegistryProperties config, TimeSeriesRegistryProperties config,
Map<String, String> customTags) { Map<String, String> customTags,String ...tagKeys) {
super(new TimeSeriesPropertiesPropertiesConfigAdapter(config), Clock.SYSTEM); super(new TimeSeriesPropertiesPropertiesConfigAdapter(config), Clock.SYSTEM);
this.timeSeriesManager = timeSeriesManager; this.timeSeriesManager = timeSeriesManager;
this.metric = metric; this.metric = metric;
this.customTags = customTags; this.customTags = customTags;
keys.addAll(customTags.keySet());
keys.addAll(Arrays.asList(tagKeys));
keys.addAll(config.getCustomTagKeys());
start(DEFAULT_THREAD_FACTORY); start(DEFAULT_THREAD_FACTORY);
} }
@Override @Override
public void start(ThreadFactory threadFactory) { public void start(ThreadFactory threadFactory) {
super.start(threadFactory); super.start(threadFactory);
timeSeriesManager.registerMetadata(MeterTimeSeriesMetadata.of(metric)) timeSeriesManager.registerMetadata(MeterTimeSeriesMetadata.of(metric,keys))
.doOnError(e -> log.error("register metric metadata error", e)) .doOnError(e -> log.error("register metric metadata error", e))
.subscribe((r) -> log.error("register metric [{}] metadata success", metric.getId())); .subscribe((r) -> log.error("register metric [{}] metadata success", metric.getId()));
} }

View File

@ -1,6 +1,7 @@
package org.jetlinks.community.timeseries.micrometer; package org.jetlinks.community.timeseries.micrometer;
import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.config.NamingConvention;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import org.jetlinks.community.micrometer.MeterRegistrySupplier; import org.jetlinks.community.micrometer.MeterRegistrySupplier;
@ -14,6 +15,16 @@ public class TimeSeriesMeterRegistrySupplier implements MeterRegistrySupplier {
private final TimeSeriesManager timeSeriesManager; private final TimeSeriesManager timeSeriesManager;
@Getter
@Setter
private String naming = "elastic";
static Map<String, NamingConvention> namingSupports = new HashMap<>();
static {
namingSupports.put("elastic", new ElasticNamingConvention());
}
@Getter @Getter
@Setter @Setter
private Map<String, String> tags = new HashMap<>(); private Map<String, String> tags = new HashMap<>();
@ -28,8 +39,13 @@ public class TimeSeriesMeterRegistrySupplier implements MeterRegistrySupplier {
} }
@Override @Override
public MeterRegistry getMeterRegistry(String metric) { public MeterRegistry getMeterRegistry(String metric, String... tagKeys) {
return new TimeSeriesMeterRegistry(timeSeriesManager, TimeSeriesMetric.of(metric), metrics.getOrDefault(metric, metrics.get("default")), tags); TimeSeriesMeterRegistry registry = new TimeSeriesMeterRegistry(timeSeriesManager,
TimeSeriesMetric.of(metric),
metrics.getOrDefault(metric, metrics.get("default")),
tags, tagKeys);
registry.config().namingConvention(namingSupports.get(naming));
return registry;
} }
} }

View File

@ -1,7 +1,14 @@
package org.jetlinks.community.timeseries.micrometer; package org.jetlinks.community.timeseries.micrometer;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.actuate.autoconfigure.metrics.export.properties.StepRegistryProperties; import org.springframework.boot.actuate.autoconfigure.metrics.export.properties.StepRegistryProperties;
public class TimeSeriesRegistryProperties extends StepRegistryProperties { import java.util.ArrayList;
import java.util.List;
@Getter
@Setter
public class TimeSeriesRegistryProperties extends StepRegistryProperties {
private List<String> customTagKeys = new ArrayList<>();
} }

View File

@ -25,17 +25,7 @@ class DeviceMessageMeasurement extends StaticMeasurement {
private TimeSeriesManager timeSeriesManager; private TimeSeriesManager timeSeriesManager;
static MeasurementDefinition definition = new MeasurementDefinition() { static MeasurementDefinition definition = MeasurementDefinition.of("quantity", "设备消息量");
@Override
public String getId() {
return "quantity";
}
@Override
public String getName() {
return "设备消息量";
}
};
public DeviceMessageMeasurement(MessageGateway messageGateway, TimeSeriesManager timeSeriesManager) { public DeviceMessageMeasurement(MessageGateway messageGateway, TimeSeriesManager timeSeriesManager) {
super(definition); super(definition);
@ -86,9 +76,9 @@ class DeviceMessageMeasurement extends StaticMeasurement {
static ConfigMetadata historyConfigMetadata = new DefaultConfigMetadata() static ConfigMetadata historyConfigMetadata = new DefaultConfigMetadata()
.add("productId", "设备型号", "", new StringType())
.add("time", "周期", "例如: 1h,10m,30s", new StringType()) .add("time", "周期", "例如: 1h,10m,30s", new StringType())
.add("format", "时间格式", "如: MM-dd:HH", new StringType()) .add("format", "时间格式", "如: MM-dd:HH", new StringType())
.add("productId", "设备型号", "", new StringType())
.add("msgType", "消息类型", "", new StringType()) .add("msgType", "消息类型", "", new StringType())
.add("limit", "最大数据量", "", new IntType()) .add("limit", "最大数据量", "", new IntType())
.add("from", "时间从", "", new DateTimeType()) .add("from", "时间从", "", new DateTimeType())
@ -129,7 +119,6 @@ class DeviceMessageMeasurement extends StaticMeasurement {
query.where("name", "message-count") query.where("name", "message-count")
.is("productId", parameter.getString("productId").orElse(null)) .is("productId", parameter.getString("productId").orElse(null))
.is("msgType", parameter.getString("msgType").orElse(null)) .is("msgType", parameter.getString("msgType").orElse(null))
) )
.limit(parameter.getInt("limit").orElse(1)) .limit(parameter.getInt("limit").orElse(1))
.from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant()))) .from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))

View File

@ -26,7 +26,8 @@ public class DeviceMessageMeasurementProvider extends StaticMeasurementProvider
addMeasurement(new DeviceMessageMeasurement(messageGateway, timeSeriesManager)); addMeasurement(new DeviceMessageMeasurement(messageGateway, timeSeriesManager));
MeterRegistry registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId()); MeterRegistry registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId(),
"target", "msgType", "productId");
//订阅设备消息,用于统计设备消息量 //订阅设备消息,用于统计设备消息量
messageGateway.subscribe("/device/*/message/**") messageGateway.subscribe("/device/*/message/**")

View File

@ -32,8 +32,8 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
addMeasurement(new DeviceStatusRecordMeasurement(instanceService, timeSeriesManager)); addMeasurement(new DeviceStatusRecordMeasurement(instanceService, timeSeriesManager));
MeterRegistry registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId()); MeterRegistry registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId(),
"target", "msgType", "productId");
Map<String, LongAdder> productCounts = new ConcurrentHashMap<>(); Map<String, LongAdder> productCounts = new ConcurrentHashMap<>();
Function<String, LongAdder> counterAdder = productId -> Function<String, LongAdder> counterAdder = productId ->

View File

@ -42,6 +42,7 @@ class DeviceStatusRecordMeasurement
} }
static ConfigMetadata aggConfigMetadata = new DefaultConfigMetadata() static ConfigMetadata aggConfigMetadata = new DefaultConfigMetadata()
.add("productId", "设备型号", "", new StringType())
.add("time", "周期", "例如: 1h,10m,30s", new StringType()) .add("time", "周期", "例如: 1h,10m,30s", new StringType())
.add("format", "时间格式", "如: MM-dd:HH", new StringType()) .add("format", "时间格式", "如: MM-dd:HH", new StringType())
.add("limit", "最大数据量", "", new IntType()) .add("limit", "最大数据量", "", new IntType())
@ -76,7 +77,10 @@ class DeviceStatusRecordMeasurement
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) { public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
return AggregationQueryParam.of() return AggregationQueryParam.of()
.max("value") .max("value")
.filter(query -> query.where("name", "online-count")) .filter(query ->
query.where("name", "online-count")
.is("productId", parameter.getString("productId").orElse(null))
)
.from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-30).atZone(ZoneId.systemDefault()).toInstant()))) .from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-30).atZone(ZoneId.systemDefault()).toInstant())))
.to(parameter.getDate("to").orElse(new Date())) .to(parameter.getDate("to").orElse(new Date()))
.groupBy(parameter.getDuration("time").orElse(Duration.ofDays(1)), .groupBy(parameter.getDuration("time").orElse(Duration.ofDays(1)),