优化监控

This commit is contained in:
zhouhao 2020-02-18 16:07:25 +08:00
parent 157fa284f2
commit 2d48f46e65
19 changed files with 402 additions and 131 deletions

View File

@ -13,6 +13,7 @@ import lombok.Getter;
public enum CommonDimensionDefinition implements DimensionDefinition {
realTime("实时数据"),
history("历史数据"),
current("当前数据"),
agg("聚合数据");
private String name;

View File

@ -1,5 +1,19 @@
package org.jetlinks.community.dashboard;
public interface DimensionDefinition extends Definition {
static DimensionDefinition of(String id, String name) {
return new DimensionDefinition() {
@Override
public String getId() {
return id;
}
@Override
public String getName() {
return name;
}
};
}
}

View File

@ -1,5 +1,19 @@
package org.jetlinks.community.dashboard;
public interface MeasurementDefinition extends Definition {
static MeasurementDefinition of(String id, String name) {
return new MeasurementDefinition() {
@Override
public String getId() {
return id;
}
@Override
public String getName() {
return name;
}
};
}
}

View File

@ -2,10 +2,12 @@ package org.jetlinks.community.dashboard;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
/**
* 指标维度,: 每小时,服务器1
*
* @author zhouhao
*/
public interface MeasurementDimension {
@ -18,6 +20,6 @@ public interface MeasurementDimension {
boolean isRealTime();
Flux<MeasurementValue> getValue(MeasurementParameter parameter);
Publisher<? extends MeasurementValue> getValue(MeasurementParameter parameter);
}

View File

@ -69,7 +69,7 @@ public class DashboardController {
.flatMap(dash -> dash.getObject(request.getObject()))
.flatMap(obj -> obj.getMeasurement(request.getMeasurement()))
.flatMap(meas -> meas.getDimension(request.getDimension()))
.filter(dim -> !dim.isRealTime()) //实时数据请使用
.filter(dim -> !dim.isRealTime()) //实时数据请使用EventSource方式
.flatMapMany(dim -> dim.getValue(MeasurementParameter.of(request.getParams())))
.map(val -> DashboardMeasurementResponse.of(request.getGroup(), val)));
}

View File

@ -17,13 +17,15 @@ public class DimensionInfo {
private ConfigMetadata params;
public static DimensionInfo of(MeasurementDimension dimension){
DimensionInfo dimensionInfo=new DimensionInfo();
private boolean realTime;
public static DimensionInfo of(MeasurementDimension dimension) {
DimensionInfo dimensionInfo = new DimensionInfo();
dimensionInfo.setId(dimension.getDefinition().getId());
dimensionInfo.setName(dimension.getDefinition().getName());
dimensionInfo.setParams(dimension.getParams());
dimensionInfo.setType(dimension.getValueType());
dimensionInfo.setRealTime(dimension.isRealTime());
return dimensionInfo;
}
}

View File

@ -36,7 +36,7 @@ public enum TermTypeEnum {
public QueryBuilder process(Term term) {
Object between = null;
Object and = null;
List values = TermCommonUtils.convertToList(term.getValue());
List<?> values = TermCommonUtils.convertToList(term.getValue());
if (values.size() > 0) {
between = values.get(0);
}

View File

@ -6,6 +6,7 @@ import org.hswebframework.ezorm.core.param.Term;
import org.springframework.stereotype.Component;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Consumer;
/**
@ -19,6 +20,7 @@ public class DefaultLinkTypeParser implements LinkTypeParser {
@Override
public BoolQueryBuilder process(Term term, Consumer<Term> consumer, BoolQueryBuilder queryBuilders) {
if ("or".equalsIgnoreCase(term.getType().name())) {
handleOr(queryBuilders, term, consumer);
} else if ("and".equalsIgnoreCase(term.getType().name())) {
@ -31,11 +33,11 @@ public class DefaultLinkTypeParser implements LinkTypeParser {
private void handleOr(BoolQueryBuilder queryBuilders, Term term, Consumer<Term> consumer) {
consumer.accept(term);
if (term.getTerms().isEmpty()) {
if (term.getTerms().isEmpty() && term.getValue() != null) {
parser.process(() -> term, queryBuilders::should);
} else {
BoolQueryBuilder nextQuery = QueryBuilders.boolQuery();
LinkedList<Term> terms = ((LinkedList<Term>) term.getTerms());
List<Term> terms = ( term.getTerms());
terms.forEach(t -> process(t, consumer, nextQuery));
queryBuilders.should(nextQuery);
}
@ -43,22 +45,13 @@ public class DefaultLinkTypeParser implements LinkTypeParser {
private void handleAnd(BoolQueryBuilder queryBuilders, Term term, Consumer<Term> consumer) {
consumer.accept(term);
if (term.getTerms().isEmpty()) {
if (term.getTerms().isEmpty()&& term.getValue() != null) {
parser.process(() -> term, queryBuilders::must);
} else {
BoolQueryBuilder nextQuery = QueryBuilders.boolQuery();
LinkedList<Term> terms = ((LinkedList<Term>) term.getTerms());
List<Term> terms = term.getTerms();
terms.forEach(t -> process(t, consumer, nextQuery));
queryBuilders.must(nextQuery);
}
}
private static Term getLast(LinkedList<Term> terms) {
int index = terms.indexOf(terms.getLast());
while (index >= 0) {
if (terms.get(index).getTerms().isEmpty()) break;
index--;
}
return terms.get(index);
}
}

View File

@ -28,7 +28,7 @@ public class TermCommonUtils {
}
public static Object getStandardsTermValue(List<Object> value) {
if (value.size() > 0 && value.size() < 2) {
if (value.size() == 1) {
return value.get(0);
}
return value;

View File

@ -5,69 +5,77 @@ import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.AllArgsConstructor;
@AllArgsConstructor
import java.util.concurrent.atomic.AtomicReference;
class MicrometerDeviceGatewayMonitor implements DeviceGatewayMonitor {
MeterRegistry registry;
String id;
String[] tags;
@Override
public void totalConnection(long total) {
private AtomicReference<Long> totalRef = new AtomicReference<>(0L);
public MicrometerDeviceGatewayMonitor(MeterRegistry registry, String id, String[] tags) {
this.registry = registry;
this.id = id;
this.tags = tags;
Gauge
.builder(id, total, Number::doubleValue)
.builder(id, totalRef, AtomicReference::get)
.tags(tags)
.tag("target", "connection")
.register(registry);
this.connected = getCounter("connected");
this.rejected = getCounter("rejected");
this.disconnected = getCounter("disconnected");
this.sentMessage = getCounter("sentMessage");
this.receivedMessage = getCounter("receivedMessage");
}
final Counter connected;
final Counter rejected;
final Counter disconnected;
final Counter receivedMessage;
final Counter sentMessage;
private Counter getCounter(String target) {
return Counter
.builder(id)
.tags(tags)
.tag("target", target)
.register(registry);
}
@Override
public void totalConnection(long total) {
totalRef.set(Math.max(0, total));
}
@Override
public void connected() {
Counter
.builder(id)
.tags(tags)
.tag("target", "connected")
.register(registry)
.increment();
connected.increment();
}
@Override
public void rejected() {
Counter
.builder(id)
.tags(tags)
.tag("target", "rejected")
.register(registry)
.increment();
rejected.increment();
}
@Override
public void disconnected() {
Counter
.builder(id)
.tags(tags)
.tag("target", "disconnected")
.register(registry)
.increment();
disconnected.increment();
}
@Override
public void receivedMessage() {
Counter
.builder(id)
.tags(tags)
.tag("target", "receivedMessage")
.register(registry)
receivedMessage
.increment();
}
@Override
public void sentMessage() {
Counter
.builder(id)
.tags(tags)
.tag("target", "sentMessage")
.register(registry)
sentMessage
.increment();
}
}

View File

@ -5,59 +5,74 @@ import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.AllArgsConstructor;
import java.util.concurrent.atomic.AtomicReference;
@AllArgsConstructor
class MicrometerMessageGatewayMonitor implements MessageGatewayMonitor {
MeterRegistry registry;
String id;
String[] tags;
private final AtomicReference<Long> totalRef = new AtomicReference<>(0L);
@Override
public void totalSession(long sessionNumber) {
public MicrometerMessageGatewayMonitor(MeterRegistry registry, String id, String[] tags) {
this.registry = registry;
this.id = id;
this.tags = tags;
Gauge
.builder(id, sessionNumber, Number::doubleValue)
.builder(id, totalRef, AtomicReference::get)
.tags(tags)
.tag("target", "sessionNumber")
.register(registry);
this.acceptedSession=getCounter("acceptedSession");
this.closedSession=getCounter("closedSession");
this.subscribed=getCounter("subscribed");
this.unsubscribed=getCounter("unsubscribed");
this.acceptMessage=getCounter("acceptMessage");
}
@Override
public void acceptedSession() {
Counter
public void totalSession(long sessionNumber) {
totalRef.set(Math.max(0, sessionNumber));
}
final Counter acceptedSession;
final Counter closedSession;
final Counter subscribed;
final Counter unsubscribed;
final Counter acceptMessage;
private Counter getCounter(String target) {
return Counter
.builder(id)
.tags(tags)
.tag("target", "acceptedSession")
.register(registry)
.tag("target", target)
.register(registry);
}
@Override
public void acceptedSession() {
acceptedSession
.increment();
}
@Override
public void closedSession() {
Counter
.builder(id)
.tags(tags)
.tag("target", "closedSession")
.register(registry)
closedSession
.increment();
}
@Override
public void subscribed() {
Counter
.builder(id)
.tags(tags)
.tag("target", "subscribed")
.register(registry)
subscribed
.increment();
}
@Override
public void unsubscribed() {
Counter
.builder(id)
.tags(tags)
.tag("target", "unsubscribed")
.register(registry)
unsubscribed
.increment();
}
@ -67,18 +82,14 @@ class MicrometerMessageGatewayMonitor implements MessageGatewayMonitor {
.builder(id)
.tags(tags)
.tag("target", "dispatched")
.tag("connector",connector)
.tag("connector", connector)
.register(registry)
.increment();
}
@Override
public void acceptMessage() {
Counter
.builder(id)
.tags(tags)
.tag("target", "acceptMessage")
.register(registry)
acceptedSession
.increment();
}

View File

@ -3,6 +3,9 @@ package org.jetlinks.community.network.mqtt.gateway.device;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.community.gateway.monitor.GatewayMonitors;
import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
import org.jetlinks.core.device.AuthenticationResponse;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
@ -30,10 +33,11 @@ import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
@Slf4j
class MqttServerDeviceGateway implements DeviceGateway {
class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGateway {
@Getter
private String id;
@ -46,11 +50,16 @@ class MqttServerDeviceGateway implements DeviceGateway {
private DecodedClientMessageHandler messageHandler;
private DeviceGatewayMonitor gatewayMonitor;
private LongAdder counter = new LongAdder();
public MqttServerDeviceGateway(String id,
DeviceRegistry registry,
DeviceSessionManager sessionManager,
MqttServer mqttServer,
DecodedClientMessageHandler messageHandler) {
this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor(id);
this.id = id;
this.registry = registry;
this.sessionManager = sessionManager;
@ -66,6 +75,11 @@ class MqttServerDeviceGateway implements DeviceGateway {
private Disposable disposable;
@Override
public long totalConnection() {
return counter.sum();
}
private void doStart() {
if (started.getAndSet(true) || disposable != null) {
return;
@ -75,12 +89,16 @@ class MqttServerDeviceGateway implements DeviceGateway {
.filter(conn -> {
if (!started.get()) {
conn.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
gatewayMonitor.rejected();
}
return started.get();
})
.flatMap(con -> Mono.justOrEmpty(con.getAuth())
//没有认证信息,则拒绝连接.
.switchIfEmpty(Mono.fromRunnable(() -> con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED)))
.switchIfEmpty(Mono.fromRunnable(() -> {
con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
gatewayMonitor.rejected();
}))
.flatMap(auth ->
registry.getDevice(con.getClientId())
.flatMap(device -> device
@ -98,22 +116,41 @@ class MqttServerDeviceGateway implements DeviceGateway {
})
))
//设备注册信息不存在,拒绝连接
.switchIfEmpty(Mono.fromRunnable(() -> con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)))
.switchIfEmpty(Mono.fromRunnable(() -> {
con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
gatewayMonitor.rejected();
}))
.onErrorContinue((err, res) -> {
gatewayMonitor.rejected();
con.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
log.error("MQTT连接认证[{}]失败", con.getClientId(), err);
}))
.flatMap(tuple3 -> {
counter.increment();
DeviceOperator device = tuple3.getT1();
AuthenticationResponse resp = tuple3.getT2();
MqttConnection con = tuple3.getT3();
String deviceId = device.getDeviceId();
if (resp.isSuccess()) {
DeviceSession session = new MqttConnectionSession(deviceId, device, getTransport(), con);
DeviceSession session = new MqttConnectionSession(deviceId, device, getTransport(), con) {
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
return super.send(encodedMessage).doOnSuccess(s -> gatewayMonitor.sentMessage());
}
};
sessionManager.register(session);
con.onClose(conn -> sessionManager.unregister(deviceId));
gatewayMonitor.connected();
gatewayMonitor.totalConnection(counter.sum());
//监听断开连接
con.onClose(conn -> {
counter.decrement();
sessionManager.unregister(deviceId);
gatewayMonitor.disconnected();
gatewayMonitor.totalConnection(counter.sum());
});
return Mono.just(Tuples.of(con.accept(), device, session));
} else {
gatewayMonitor.rejected();
log.warn("MQTT客户端认证[{}]失败:{}", deviceId, resp.getMessage());
}
return Mono.empty();
@ -123,6 +160,7 @@ class MqttServerDeviceGateway implements DeviceGateway {
.handleMessage()
.filter(pb -> started.get())
.takeWhile(pub -> disposable != null)
.doOnNext(msg -> gatewayMonitor.receivedMessage())
.flatMap(publishing -> tp.getT2()
.getProtocol()
.flatMap(protocol -> protocol.getMessageCodec(getTransport()))

View File

@ -87,7 +87,6 @@ public class MeterTimeSeriesData implements TimeSeriesData {
}
public MeterTimeSeriesData write(Timer timer) {
write((Meter) timer);
data.put("count", timer.count());
data.put("sum", timer.totalTime(TimeUnit.MILLISECONDS));
data.put("mean", timer.mean(TimeUnit.MILLISECONDS));
@ -96,7 +95,6 @@ public class MeterTimeSeriesData implements TimeSeriesData {
}
public MeterTimeSeriesData write(DistributionSummary summary) {
write((Meter) summary);
data.put("count", summary.count());
data.put("sum", summary.totalAmount());
data.put("mean", summary.mean());
@ -113,6 +111,7 @@ public class MeterTimeSeriesData implements TimeSeriesData {
public static MeterTimeSeriesData of(Meter meter) {
MeterTimeSeriesData data = new MeterTimeSeriesData();
data.write(meter);
meter.match(
data::write,
data::write,

View File

@ -6,6 +6,7 @@ import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
@ -13,6 +14,8 @@ import org.jetlinks.community.device.message.DeviceMessageUtils;
import org.jetlinks.community.gateway.MessageGateway;
import org.jetlinks.community.gateway.Subscription;
import org.jetlinks.community.timeseries.TimeSeriesService;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.StringType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -77,6 +80,9 @@ class DevicePropertyMeasurement extends StaticMeasurement {
.filter(msg -> msg.containsKey(metadata.getId()))
.map(msg -> SimpleMeasurementValue.of(createValue(msg.get(metadata.getId())), System.currentTimeMillis()));
}
static ConfigMetadata configMetadata = new DefaultConfigMetadata()
.add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"))
.add("history", "历史数据量", "查询出历史数据后开始推送实时数据", new IntType().min(0).expand("defaultValue", 10));
/**
* 实时设备事件
@ -95,8 +101,7 @@ class DevicePropertyMeasurement extends StaticMeasurement {
@Override
public ConfigMetadata getParams() {
// TODO: 2020/1/15
return null;
return configMetadata;
}
@Override

View File

@ -28,7 +28,7 @@ class DeviceMessageMeasurement extends StaticMeasurement {
static MeasurementDefinition definition = new MeasurementDefinition() {
@Override
public String getId() {
return "device-message-quantity";
return "quantity";
}
@Override
@ -49,8 +49,7 @@ class DeviceMessageMeasurement extends StaticMeasurement {
static DataType valueType = new IntType();
static ConfigMetadata realTimeConfigMetadata = new DefaultConfigMetadata()
.add("interval", "数据统计周期", "例如: 1s,10s", new StringType())
;
.add("interval", "数据统计周期", "例如: 1s,10s", new StringType());
class RealTimeMessageDimension implements MeasurementDimension {
@ -89,10 +88,11 @@ class DeviceMessageMeasurement extends StaticMeasurement {
static ConfigMetadata historyConfigMetadata = new DefaultConfigMetadata()
.add("time", "周期", "例如: 1h,10m,30s", new StringType())
.add("format", "时间格式", "如: MM-dd:HH", new StringType())
.add("productId", "设备型号", "", new StringType())
.add("msgType", "消息类型", "", new StringType())
.add("limit", "最大数据量", "", new IntType())
.add("from", "时间从", "", new DateTimeType())
.add("to", "时间至", "", new DateTimeType())
;
.add("to", "时间至", "", new DateTimeType());
class AggMessageDimension implements MeasurementDimension {
@ -125,7 +125,12 @@ class DeviceMessageMeasurement extends StaticMeasurement {
.groupBy(parameter.getDuration("time").orElse(Duration.ofHours(1)),
"time",
parameter.getString("format").orElse("MM-dd:HH"))
.filter(query -> query.where("name", "message-count"))
.filter(query ->
query.where("name", "message-count")
.is("productId", parameter.getString("productId").orElse(null))
.is("msgType", parameter.getString("msgType").orElse(null))
)
.limit(parameter.getInt("limit").orElse(1))
.from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
.to(parameter.getDate("to").orElse(new Date()))
@ -137,4 +142,5 @@ class DeviceMessageMeasurement extends StaticMeasurement {
}
}
}

View File

@ -4,10 +4,13 @@ import io.micrometer.core.instrument.MeterRegistry;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.jetlinks.community.device.measurements.DeviceDashboardDefinition;
import org.jetlinks.community.device.measurements.DeviceObjectDefinition;
import org.jetlinks.community.device.message.DeviceMessageUtils;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.gateway.MessageGateway;
import org.jetlinks.community.gateway.TopicMessage;
import org.jetlinks.community.micrometer.MeterRegistryManager;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.core.message.DeviceMessage;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
@ -22,15 +25,28 @@ public class DeviceMessageMeasurementProvider extends StaticMeasurementProvider
super(DeviceDashboardDefinition.instance, DeviceObjectDefinition.message);
addMeasurement(new DeviceMessageMeasurement(messageGateway, timeSeriesManager));
//定时提交设备消息量
MeterRegistry registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId());
//订阅设备消息,用于统计设备消息量
messageGateway.subscribe("/device/*/message/**")
.window(Duration.ofSeconds(5))
.flatMap(Flux::count)
.subscribe(total -> registry
.counter("message-count")
.increment(total));
.map(this::convertTags)
.subscribe(tags -> registry
.counter("message-count", tags)
.increment());
}
static final String[] empty = new String[0];
private String[] convertTags(TopicMessage msg) {
DeviceMessage message = DeviceMessageUtils.convert(msg).orElse(null);
if (message == null) {
return empty;
}
return new String[]{
"msgType", message.getMessageType().name().toLowerCase(),
"productId", message.getHeader("productId").map(String::valueOf).orElse("unknown")
};
}
}

View File

@ -31,17 +31,7 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
private TimeSeriesManager timeSeriesManager;
static MeasurementDefinition definition = new MeasurementDefinition() {
@Override
public String getId() {
return "device-status";
}
@Override
public String getName() {
return "设备状态";
}
};
static MeasurementDefinition definition = MeasurementDefinition.of("change", "设备状态变更");
static ConfigMetadata configMetadata = new DefaultConfigMetadata()
.add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
@ -55,7 +45,7 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
this.messageGateway = messageGateway;
this.timeSeriesManager = timeSeriesManager;
addDimension(new RealTimeDeviceStateDimension());
addDimension(new HistoryDeviceStateDimension());
addDimension(new CountDeviceStateDimension());
}
static ConfigMetadata historyConfigMetadata = new DefaultConfigMetadata()
@ -66,16 +56,18 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
.addElement(EnumType.Element.of("offline", "离线")))
.add("limit", "最大数据量", "", new IntType())
.add("from", "时间从", "", new DateTimeType())
.add("to", "时间至", "", new DateTimeType())
;
.add("to", "时间至", "", new DateTimeType());
static DataType historyValueType = new IntType();
class HistoryDeviceStateDimension implements MeasurementDimension {
/**
* 设备状态统计
*/
class CountDeviceStateDimension implements MeasurementDimension {
@Override
public DimensionDefinition getDefinition() {
return CommonDimensionDefinition.history;
return CommonDimensionDefinition.agg;
}
@Override
@ -101,7 +93,10 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
.groupBy(parameter.getDuration("time").orElse(Duration.ofHours(1)),
"time",
parameter.getString("format").orElse("MM-dd:HH"))
.filter(query -> query.where("name", parameter.getString("type").orElse("online")))
.filter(query ->
query.where("name", parameter.getString("type").orElse("online"))
.is("productId", parameter.getString("productId").orElse(null))
)
.limit(parameter.getInt("limit").orElse(1))
.from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
.to(parameter.getDate("to").orElse(new Date()))

View File

@ -1,42 +1,73 @@
package org.jetlinks.community.device.measurements.status;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.jetlinks.community.device.measurements.DeviceDashboardDefinition;
import org.jetlinks.community.device.measurements.DeviceObjectDefinition;
import org.jetlinks.community.device.message.DeviceMessageUtils;
import org.jetlinks.community.device.service.LocalDeviceInstanceService;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.gateway.MessageGateway;
import org.jetlinks.community.gateway.TopicMessage;
import org.jetlinks.community.micrometer.MeterRegistryManager;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
@Component
public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
public DeviceStatusMeasurementProvider(MeterRegistryManager registryManager,
LocalDeviceInstanceService instanceService,
TimeSeriesManager timeSeriesManager,
MessageGateway messageGateway) {
super(DeviceDashboardDefinition.instance, DeviceObjectDefinition.status);
addMeasurement(new DeviceStatusChangeMeasurement(timeSeriesManager, messageGateway));
addMeasurement(new DeviceStatusRecordMeasurement(instanceService, timeSeriesManager));
MeterRegistry registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId());
Map<String, LongAdder> productCounts = new ConcurrentHashMap<>();
Function<String, LongAdder> counterAdder = productId ->
productCounts.computeIfAbsent(productId, __id -> {
LongAdder adder = new LongAdder();
Gauge.builder("online-count", adder, LongAdder::sum)
.tag("productId", __id)
.register(registry);
return adder;
});
//上线
messageGateway.subscribe("/device/*/online")
.window(Duration.ofSeconds(5))
.flatMap(Flux::count)
.subscribe(total -> registry
.counter("online")
.increment(total));
.map(this::parseProductId)
.subscribe(productId -> {
counterAdder.apply(productId).increment();
registry
.counter("online", "productId", productId)
.increment();
});
//下线
messageGateway.subscribe("/device/*/offline")
.window(Duration.ofSeconds(5))
.flatMap(Flux::count)
.subscribe(total -> registry
.counter("offline")
.increment(total));
.map(this::parseProductId)
.subscribe(productId -> {
counterAdder.apply(productId).decrement();
registry
.counter("offline", "productId", productId)
.increment();
});
}
private String parseProductId(TopicMessage msg) {
return DeviceMessageUtils.convert(msg)
.flatMap(deviceMessage -> deviceMessage.getHeader("productId"))
.map(String::valueOf).orElse("unknown");
}
}

View File

@ -0,0 +1,136 @@
package org.jetlinks.community.device.measurements.status;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.entity.DeviceInstanceEntity;
import org.jetlinks.community.device.enums.DeviceState;
import org.jetlinks.community.device.service.LocalDeviceInstanceService;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
class DeviceStatusRecordMeasurement
extends StaticMeasurement {
public LocalDeviceInstanceService instanceService;
private TimeSeriesManager timeSeriesManager;
static MeasurementDefinition definition = MeasurementDefinition.of("record", "设备状态记录");
public DeviceStatusRecordMeasurement(LocalDeviceInstanceService deviceInstanceService,
TimeSeriesManager timeSeriesManager) {
super(definition);
this.timeSeriesManager = timeSeriesManager;
this.instanceService = deviceInstanceService;
addDimension(new CurrentNumberOfDeviceDimension());
addDimension(new AggNumberOfOnlineDeviceDimension());
}
static ConfigMetadata aggConfigMetadata = new DefaultConfigMetadata()
.add("time", "周期", "例如: 1h,10m,30s", new StringType())
.add("format", "时间格式", "如: MM-dd:HH", new StringType())
.add("limit", "最大数据量", "", new IntType())
.add("from", "时间从", "", new DateTimeType())
.add("to", "时间至", "", new DateTimeType());
//历史在线数量
class AggNumberOfOnlineDeviceDimension implements MeasurementDimension {
@Override
public DimensionDefinition getDefinition() {
return DimensionDefinition.of("aggOnline", "历史在线数");
}
@Override
public DataType getValueType() {
return new IntType();
}
@Override
public ConfigMetadata getParams() {
return aggConfigMetadata;
}
@Override
public boolean isRealTime() {
return false;
}
@Override
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
return AggregationQueryParam.of()
.max("value")
.filter(query -> query.where("name", "online-count"))
.from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-30).atZone(ZoneId.systemDefault()).toInstant())))
.to(parameter.getDate("to").orElse(new Date()))
.groupBy(parameter.getDuration("time").orElse(Duration.ofDays(1)),
parameter.getString("format").orElse("yyyy-MM-dd"))
.limit(parameter.getInt("limit").orElse(10))
.execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics())::aggregation)
.map(data ->
SimpleMeasurementValue.of(data.getInt("value").orElse(0),
data.getString("time").orElse("-"),
System.currentTimeMillis()));
}
}
static ConfigMetadata currentMetadata = new DefaultConfigMetadata()
.add("productId", "设备型号", "", new StringType())
.add("state", "状态", "online", new EnumType()
.addElement(EnumType.Element.of(DeviceState.online.getValue(), DeviceState.online.getText()))
.addElement(EnumType.Element.of(DeviceState.offline.getValue(), DeviceState.offline.getText()))
.addElement(EnumType.Element.of(DeviceState.notActive.getValue(), DeviceState.notActive.getText()))
);
//当前设备数量
class CurrentNumberOfDeviceDimension implements MeasurementDimension {
@Override
public DimensionDefinition getDefinition() {
return CommonDimensionDefinition.current;
}
@Override
public DataType getValueType() {
return new IntType();
}
@Override
public ConfigMetadata getParams() {
return currentMetadata;
}
@Override
public boolean isRealTime() {
return false;
}
@Override
public Mono<MeasurementValue> getValue(MeasurementParameter parameter) {
return instanceService
.createQuery()
.and(DeviceInstanceEntity::getProductId, parameter.getString("productId").orElse(null))
.and(DeviceInstanceEntity::getState, parameter.get("state", DeviceState.class).orElse(null))
.count()
.map(val -> SimpleMeasurementValue.of(val, System.currentTimeMillis()));
}
}
}