From efe03f73189feb0da4170e2e0b5b2a4d6b3b5ca8 Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Tue, 25 May 2021 16:51:44 +0800 Subject: [PATCH 1/4] =?UTF-8?q?websocket=E6=94=AF=E6=8C=81ping=20pong?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../community/gateway/external/Message.java | 9 ++++++-- .../socket/WebSocketMessagingHandler.java | 21 ++++++++++++++++++- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/Message.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/Message.java index ac8cb430..357446ab 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/Message.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/Message.java @@ -35,11 +35,16 @@ public interface Message { return new SimpleMessage(id, null, null, Type.complete, null); } + static Message pong(String id) { + return new SimpleMessage(id, null, null, Type.pong, null); + } + enum Type { authError, result, error, - complete + complete, + ping, + pong } - } diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandler.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandler.java index 3d8e1ce0..b50b19c8 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandler.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/socket/WebSocketMessagingHandler.java @@ -11,9 +11,11 @@ import org.hswebframework.web.logger.ReactiveLogger; import org.jetlinks.community.gateway.external.Message; import org.jetlinks.community.gateway.external.MessagingManager; import org.jetlinks.community.gateway.external.SubscribeRequest; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.util.StringUtils; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; import reactor.core.Disposable; import reactor.core.publisher.Mono; @@ -64,8 +66,25 @@ public class WebSocketMessagingHandler implements WebSocketHandler { .receive() .doOnNext(message -> { try { + if (message.getType() == WebSocketMessage.Type.PONG) { + return; + } + if (message.getType() == WebSocketMessage.Type.PING) { + session + .send(Mono.just(session.pongMessage(DataBufferFactory::allocateBuffer))) + .subscribe(); + return; + } MessagingRequest request = JSON.parseObject(message.getPayloadAsText(), MessagingRequest.class); - if (request == null || request.getType() == MessagingRequest.Type.ping) { + if (request == null) { + return; + } + if (request.getType() == MessagingRequest.Type.ping) { + session + .send(Mono.just(session.textMessage(JSON.toJSONString( + Message.pong(request.getId()) + )))) + .subscribe(); return; } if (StringUtils.isEmpty(request.getId())) { From d8d4668da19e42dc9bee226d0d2800113333bdad Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Wed, 26 May 2021 10:14:04 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=AF=8F=E6=97=A5?= =?UTF-8?q?=E5=9C=A8=E7=BA=BF=E6=95=B0=E9=87=8F=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DeviceStatusMeasurementProvider.java | 24 +++++-------------- .../status/DeviceStatusRecordMeasurement.java | 20 ++++++++++------ .../configuration/JetLinksConfiguration.java | 8 +++++-- 3 files changed, 25 insertions(+), 27 deletions(-) diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java index 5a26350a..2071d11c 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java @@ -23,18 +23,8 @@ import java.util.function.Function; @Component public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider { - private MeterRegistry registry; - Map productCounts = new ConcurrentHashMap<>(); - - Function counterAdder = productId -> - productCounts.computeIfAbsent(productId, __id -> { - LongAdder adder = new LongAdder(); - Gauge.builder("online-count", adder, LongAdder::sum) - .tag("productId", __id) - .register(registry); - return adder; - }); + private final MeterRegistry registry; public DeviceStatusMeasurementProvider(MeterRegistryManager registryManager, LocalDeviceInstanceService instanceService, @@ -47,14 +37,13 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider { addMeasurement(new DeviceStatusRecordMeasurement(instanceService, timeSeriesManager)); registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId(), - "target", "msgType", "productId"); + "target", "msgType", "productId"); } @Subscribe("/device/*/*/online") - public Mono incrementOnline(DeviceMessage msg){ - return Mono.fromRunnable(()->{ + public Mono incrementOnline(DeviceMessage msg) { + return Mono.fromRunnable(() -> { String productId = parseProductId(msg); - counterAdder.apply(productId).increment(); registry .counter("online", "productId", productId) .increment(); @@ -62,10 +51,9 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider { } @Subscribe("/device/*/*/offline") - public Mono incrementOffline(DeviceMessage msg){ - return Mono.fromRunnable(()->{ + public Mono incrementOffline(DeviceMessage msg) { + return Mono.fromRunnable(() -> { String productId = parseProductId(msg); - // counterAdder.apply(productId).decrement(); registry .counter("offline", "productId", productId) .increment(); diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusRecordMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusRecordMeasurement.java index 3037c8d8..15740b74 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusRecordMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusRecordMeasurement.java @@ -81,22 +81,28 @@ class DeviceStatusRecordMeasurement String format = parameter.getString("format").orElse("yyyy年MM月dd日"); DateTimeFormatter formatter = DateTimeFormat.forPattern(format); - return AggregationQueryParam.of() + return AggregationQueryParam + .of() .max("value") .filter(query -> - query.where("name", "online-count") - .is("productId", parameter.getString("productId").orElse(null)) + query.where("name", "gateway-server-session") ) - .from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-30).atZone(ZoneId.systemDefault()).toInstant()))) + .from(parameter + .getDate("from") + .orElse(Date.from(LocalDateTime + .now() + .plusDays(-30) + .atZone(ZoneId.systemDefault()) + .toInstant()))) .to(parameter.getDate("to").orElse(new Date())) .groupBy(parameter.getInterval("time").orElse(Interval.ofDays(1)), - parameter.getString("format").orElse("yyyy年MM月dd日")) + parameter.getString("format").orElse("yyyy年MM月dd日")) .limit(parameter.getInt("limit").orElse(10)) .execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics())::aggregation) .map(data -> { long ts = data.getString("time") - .map(time -> DateTime.parse(time, formatter).getMillis()) - .orElse(System.currentTimeMillis()); + .map(time -> DateTime.parse(time, formatter).getMillis()) + .orElse(System.currentTimeMillis()); return SimpleMeasurementValue.of( data.get("value").orElse(0), data.getString("time", ""), diff --git a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java index 0ddbd25e..9b4bc624 100644 --- a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java +++ b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java @@ -12,6 +12,8 @@ import org.hswebframework.web.authorization.token.redis.RedisUserTokenManager; import org.jetlinks.community.device.entity.DeviceInstanceEntity; import org.jetlinks.community.device.entity.DeviceProductEntity; import org.jetlinks.community.device.service.AutoDiscoverDeviceRegistry; +import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric; +import org.jetlinks.community.micrometer.MeterRegistryManager; import org.jetlinks.core.ProtocolSupports; import org.jetlinks.core.cluster.ClusterManager; import org.jetlinks.core.config.ConfigStorageManager; @@ -145,8 +147,10 @@ public class JetLinksConfiguration { } @Bean - public GatewayServerMonitor gatewayServerMonitor(JetLinksProperties properties, MeterRegistry registry) { - GatewayServerMetrics metrics = new MicrometerGatewayServerMetrics(properties.getServerId(), registry); + public GatewayServerMonitor gatewayServerMonitor(JetLinksProperties properties, MeterRegistryManager registry) { + GatewayServerMetrics metrics = new MicrometerGatewayServerMetrics(properties.getServerId(), + registry.getMeterRegister(DeviceTimeSeriesMetric + .deviceMetrics().getId())); return new GatewayServerMonitor() { @Override From c1363bd555d881558684d07c6d930e7fb9b1c2aa Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Wed, 26 May 2021 11:14:42 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=8C=87=E5=AE=9A?= =?UTF-8?q?=E5=B1=9E=E6=80=A7=E6=9F=A5=E8=AF=A2=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DevicePropertiesMeasurement.java | 49 +++++++++++++------ .../data/AbstractDeviceDataStoragePolicy.java | 19 +++++++ .../data/DefaultDeviceDataService.java | 5 +- .../service/data/DeviceDataService.java | 3 +- .../service/data/DeviceDataStoragePolicy.java | 3 +- .../data/NoneDeviceDataStoragePolicy.java | 3 +- ...meSeriesColumnDeviceDataStoragePolicy.java | 24 ++++----- ...SeriesRowDeviceDataStoreStoragePolicy.java | 47 +++++++++--------- 8 files changed, 93 insertions(+), 60 deletions(-) diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java index 1798c50a..8871153a 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java @@ -12,13 +12,12 @@ import org.jetlinks.community.dashboard.*; import org.jetlinks.community.dashboard.supports.StaticMeasurement; import org.jetlinks.community.device.service.data.DeviceDataService; import org.jetlinks.community.gateway.DeviceMessageUtils; +import org.jetlinks.reactor.ql.utils.CastUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.stream.Collectors; @Slf4j class DevicePropertiesMeasurement extends StaticMeasurement { @@ -45,16 +44,20 @@ class DevicePropertiesMeasurement extends StaticMeasurement { } - Flux fromHistory(String deviceId, int history) { - return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery() + Flux fromHistory(String deviceId, int history, Set properties) { + return history <= 0 + ? Flux.empty() + : QueryParamEntity + .newQuery() .doPaging(0, history) - .execute(q -> dataService.queryEachProperties(deviceId, q)) + .execute(q -> dataService.queryEachProperties(deviceId, q, properties.toArray(new String[0]))) .map(data -> SimpleMeasurementValue.of(data, data.getTimestamp())) .sort(MeasurementValue.sort()); } Map createValue(String property, Object value) { - return metadata.getProperty(property) + return metadata + .getProperty(property) .map(meta -> { Map values = new HashMap<>(); DataType type = meta.getValueType(); @@ -74,7 +77,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement { }); } - Flux fromRealTime(String deviceId) { + Flux fromRealTime(String deviceId, Set properties) { Subscription subscription = Subscription.of( "realtime-device-properties-measurement", @@ -88,7 +91,9 @@ class DevicePropertiesMeasurement extends StaticMeasurement { Map index = new HashMap<>(); int idx = 0; for (PropertyMetadata prop : props) { - index.put(prop.getId(), idx++); + if (properties.isEmpty() || properties.contains(prop.getId())) { + index.put(prop.getId(), idx++); + } } return eventBus @@ -106,6 +111,16 @@ class DevicePropertiesMeasurement extends StaticMeasurement { static ConfigMetadata configMetadata = new DefaultConfigMetadata() .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector")); + static Set getPropertiesFromParameter(MeasurementParameter parameter) { + return parameter + .get("properties") + .map(CastUtils::castArray) + .orElse(Collections.emptyList()) + .stream() + .map(String::valueOf) + .collect(Collectors.toSet()); + } + /** * 历史 */ @@ -136,11 +151,12 @@ class DevicePropertiesMeasurement extends StaticMeasurement { @Override public Flux getValue(MeasurementParameter parameter) { - return Mono.justOrEmpty(parameter.getString("deviceId")) + return Mono + .justOrEmpty(parameter.getString("deviceId")) .flatMapMany(deviceId -> { int history = parameter.getInt("history").orElse(1); - //合并历史数据和实时数据 - return fromHistory(deviceId, history); + + return fromHistory(deviceId, history, getPropertiesFromParameter(parameter)); }); } } @@ -175,16 +191,17 @@ class DevicePropertiesMeasurement extends StaticMeasurement { @Override public Flux getValue(MeasurementParameter parameter) { - return Mono.justOrEmpty(parameter.getString("deviceId")) + return Mono + .justOrEmpty(parameter.getString("deviceId")) .flatMapMany(deviceId -> { int history = parameter.getInt("history").orElse(0); //合并历史数据和实时数据 return Flux.concat( //查询历史数据 - fromHistory(deviceId, history) + fromHistory(deviceId, history, getPropertiesFromParameter(parameter)) , //从消息网关订阅实时事件消息 - fromRealTime(deviceId) + fromRealTime(deviceId, getPropertiesFromParameter(parameter)) ); }); } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java index ebaa8d0d..ca87cce3 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java @@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; import static org.jetlinks.community.device.service.data.StorageConstants.propertyIsIgnoreStorage; import static org.jetlinks.community.device.service.data.StorageConstants.propertyIsJsonStringStorage; @@ -511,6 +512,24 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora .flatMap(product -> Mono.zip(Mono.just(product), product.getMetadata())); } + protected List getPropertyMetadata(DeviceMetadata metadata, String... properties) { + if (properties == null || properties.length == 0) { + return metadata.getProperties(); + } + if (properties.length == 1) { + return metadata.getProperty(properties[0]) + .map(Arrays::asList) + .orElseGet(Collections::emptyList); + } + Set ids = new HashSet<>(Arrays.asList(properties)); + return metadata + .getProperties() + .stream() + .filter(prop -> ids.isEmpty() || ids.contains(prop.getId())) + .collect(Collectors.toList()); + } + + private final AtomicInteger nanoInc = new AtomicInteger(); diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java index a0ead1e8..d8fb1c53 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java @@ -90,10 +90,11 @@ public class DefaultDeviceDataService implements DeviceDataService { @Nonnull @Override public Flux queryEachProperties(@Nonnull String deviceId, - @Nonnull QueryParamEntity query) { + @Nonnull QueryParamEntity query, + @Nonnull String... properties) { return this .getDeviceStrategy(deviceId) - .flatMapMany(strategy -> strategy.queryEachProperties(deviceId, query)); + .flatMapMany(strategy -> strategy.queryEachProperties(deviceId, query,properties)); } @Nonnull diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataService.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataService.java index 3812c454..d8e70831 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataService.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataService.java @@ -98,7 +98,8 @@ public interface DeviceDataService { */ @Nonnull Flux queryEachProperties(@Nonnull String deviceId, - @Nonnull QueryParamEntity query); + @Nonnull QueryParamEntity query, + @Nonnull String... properties); /** * 查询指定的设备属性列表 diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataStoragePolicy.java index d652fb11..ac00b135 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataStoragePolicy.java @@ -127,7 +127,8 @@ public interface DeviceDataStoragePolicy { */ @Nonnull Flux queryEachProperties(@Nonnull String deviceId, - @Nonnull QueryParamEntity query); + @Nonnull QueryParamEntity query, + @Nonnull String... property); /** * 查询指定的设备属性列表 diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/NoneDeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/NoneDeviceDataStoragePolicy.java index bf4ff3e5..27fb5e40 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/NoneDeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/NoneDeviceDataStoragePolicy.java @@ -84,7 +84,8 @@ public class NoneDeviceDataStoragePolicy implements DeviceDataStoragePolicy { @Nonnull @Override public Flux queryEachProperties(@Nonnull String deviceId, - @Nonnull QueryParamEntity query) { + @Nonnull QueryParamEntity query, + @Nonnull String... property) { return Flux.empty(); } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java index 005782c1..ee3a8e64 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java @@ -174,23 +174,19 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat @Nonnull @Override public Flux queryEachProperties(@Nonnull String deviceId, - @Nonnull QueryParamEntity query) { + @Nonnull QueryParamEntity query, + @Nonnull String... property) { - return deviceRegistry - .getDevice(deviceId) - .flatMapMany(device -> Mono - .zip(device.getProduct(), device.getMetadata()) - .flatMapMany(tp2 -> { + return this + .getProductAndMetadataByDevice(deviceId) + .flatMapMany(tp2 -> { - Map propertiesMap = tp2 - .getT2() - .getProperties() - .stream() - .collect(Collectors.toMap(PropertyMetadata::getId, Function - .identity(), (a, b) -> a)); + Map propertiesMap = getPropertyMetadata(tp2.getT2(), property) + .stream() + .collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a)); - return queryEachDeviceProperty(tp2.getT1().getId(), deviceId, propertiesMap, query); - })); + return queryEachDeviceProperty(tp2.getT1().getId(), deviceId, propertiesMap, query); + }); } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java index 650e5618..4207bbc6 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java @@ -169,33 +169,30 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD @Nonnull @Override public Flux queryEachProperties(@Nonnull String deviceId, - @Nonnull QueryParamEntity query) { + @Nonnull QueryParamEntity query, + @Nonnull String... property) { - return deviceRegistry - .getDevice(deviceId) - .flatMapMany(device -> Mono - .zip(device.getProduct(), device.getMetadata()) - .flatMapMany(tp2 -> { + return getProductAndMetadataByDevice(deviceId) + .flatMapMany(tp2 -> { - Map propertiesMap = tp2.getT2() - .getProperties() - .stream() - .collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a)); - if (propertiesMap.isEmpty()) { - return Flux.empty(); - } - return timeSeriesManager - .getService(devicePropertyMetric(tp2.getT1().getId())) - .aggregation(AggregationQueryParam - .of() - .agg(new LimitAggregationColumn("property", "property", Aggregation.TOP, query.getPageSize())) - .groupBy(new LimitGroup("property", "property", propertiesMap.size() * 2)) //按property分组 - .filter(query) - .filter(q -> q.where("deviceId", deviceId)) - ).map(data -> DeviceProperty - .of(data, data.getString("property").map(propertiesMap::get).orElse(null)) - .deviceId(deviceId)); - })); + Map propertiesMap = getPropertyMetadata(tp2.getT2(), property) + .stream() + .collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a)); + if (propertiesMap.isEmpty()) { + return Flux.empty(); + } + return timeSeriesManager + .getService(devicePropertyMetric(tp2.getT1().getId())) + .aggregation(AggregationQueryParam + .of() + .agg(new LimitAggregationColumn("property", "property", Aggregation.TOP, query.getPageSize())) + .groupBy(new LimitGroup("property", "property", propertiesMap.size() * 2)) //按property分组 + .filter(query) + .filter(q -> q.where("deviceId", deviceId).in("property", propertiesMap.keySet())) + ).map(data -> DeviceProperty + .of(data, data.getString("property").map(propertiesMap::get).orElse(null)) + .deviceId(deviceId)); + }); } protected String getTimeSeriesMetric(String productId) { From 68bed142feec16c24c2a3b85bcb7ae696fe9b2bb Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Thu, 27 May 2021 16:52:28 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=94=A8=E6=88=B7?= =?UTF-8?q?=E8=AF=A6=E6=83=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../standalone/authorize/LoginEvent.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/authorize/LoginEvent.java b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/authorize/LoginEvent.java index ceef672e..5b5bb741 100644 --- a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/authorize/LoginEvent.java +++ b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/authorize/LoginEvent.java @@ -4,6 +4,8 @@ import org.hswebframework.web.authorization.Authentication; import org.hswebframework.web.authorization.Dimension; import org.hswebframework.web.authorization.Permission; import org.hswebframework.web.authorization.events.AuthorizationSuccessEvent; +import org.hswebframework.web.authorization.exception.AccessDenyException; +import org.jetlinks.community.auth.service.UserDetailService; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; @@ -17,16 +19,26 @@ import java.util.stream.Collectors; */ @Component public class LoginEvent { + private final UserDetailService detailService; + + public LoginEvent(UserDetailService detailService) { + this.detailService = detailService; + } + @EventListener - public void handleLoginSuccess(AuthorizationSuccessEvent event){ + public void handleLoginSuccess(AuthorizationSuccessEvent event) { Map result = event.getResult(); Authentication authentication = event.getAuthentication(); List dimensions = authentication.getDimensions(); - result.put("permissions",authentication.getPermissions()); - result.put("roles",dimensions); - result.put("user",authentication.getUser()); - result.put("currentAuthority",authentication.getPermissions().stream().map(Permission::getId).collect(Collectors.toList())); + result.put("permissions", authentication.getPermissions()); + result.put("roles", dimensions); + result.put("currentAuthority", authentication.getPermissions().stream().map(Permission::getId).collect(Collectors.toList())); + event.async( + detailService + .findUserDetail(event.getAuthentication().getUser().getId()) + .doOnNext(detail -> result.put("user", detail)) + ); } }