Conflicts:
	jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java
This commit is contained in:
Tensai 2021-05-28 11:52:50 +08:00
commit 6a8a11338e
14 changed files with 167 additions and 96 deletions

View File

@ -35,11 +35,16 @@ public interface Message {
return new SimpleMessage(id, null, null, Type.complete, null);
}
static Message pong(String id) {
return new SimpleMessage(id, null, null, Type.pong, null);
}
enum Type {
authError,
result,
error,
complete
complete,
ping,
pong
}
}

View File

@ -11,9 +11,11 @@ import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.community.gateway.external.Message;
import org.jetlinks.community.gateway.external.MessagingManager;
import org.jetlinks.community.gateway.external.SubscribeRequest;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
@ -64,8 +66,25 @@ public class WebSocketMessagingHandler implements WebSocketHandler {
.receive()
.doOnNext(message -> {
try {
if (message.getType() == WebSocketMessage.Type.PONG) {
return;
}
if (message.getType() == WebSocketMessage.Type.PING) {
session
.send(Mono.just(session.pongMessage(DataBufferFactory::allocateBuffer)))
.subscribe();
return;
}
MessagingRequest request = JSON.parseObject(message.getPayloadAsText(), MessagingRequest.class);
if (request == null || request.getType() == MessagingRequest.Type.ping) {
if (request == null) {
return;
}
if (request.getType() == MessagingRequest.Type.ping) {
session
.send(Mono.just(session.textMessage(JSON.toJSONString(
Message.pong(request.getId())
))))
.subscribe();
return;
}
if (StringUtils.isEmpty(request.getId())) {

View File

@ -12,13 +12,12 @@ import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.service.data.DeviceDataService;
import org.jetlinks.community.gateway.DeviceMessageUtils;
import org.jetlinks.reactor.ql.utils.CastUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
class DevicePropertiesMeasurement extends StaticMeasurement {
@ -45,16 +44,20 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
}
Flux<SimpleMeasurementValue> fromHistory(String deviceId, int history) {
return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery()
Flux<SimpleMeasurementValue> fromHistory(String deviceId, int history, Set<String> properties) {
return history <= 0
? Flux.empty()
: QueryParamEntity
.newQuery()
.doPaging(0, history)
.execute(q -> dataService.queryEachProperties(deviceId, q))
.execute(q -> dataService.queryEachProperties(deviceId, q, properties.toArray(new String[0])))
.map(data -> SimpleMeasurementValue.of(data, data.getTimestamp()))
.sort(MeasurementValue.sort());
}
Map<String, Object> createValue(String property, Object value) {
return metadata.getProperty(property)
return metadata
.getProperty(property)
.map(meta -> {
Map<String, Object> values = new HashMap<>();
DataType type = meta.getValueType();
@ -74,7 +77,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
});
}
Flux<MeasurementValue> fromRealTime(String deviceId) {
Flux<MeasurementValue> fromRealTime(String deviceId, Set<String> properties) {
Subscription subscription = Subscription.of(
"realtime-device-properties-measurement",
@ -88,7 +91,9 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
Map<String, Integer> index = new HashMap<>();
int idx = 0;
for (PropertyMetadata prop : props) {
index.put(prop.getId(), idx++);
if (properties.isEmpty() || properties.contains(prop.getId())) {
index.put(prop.getId(), idx++);
}
}
return
eventBus
@ -106,6 +111,16 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
static ConfigMetadata configMetadata = new DefaultConfigMetadata()
.add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
static Set<String> getPropertiesFromParameter(MeasurementParameter parameter) {
return parameter
.get("properties")
.map(CastUtils::castArray)
.orElse(Collections.emptyList())
.stream()
.map(String::valueOf)
.collect(Collectors.toSet());
}
/**
* 历史
*/
@ -136,11 +151,12 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
@Override
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
return Mono.justOrEmpty(parameter.getString("deviceId"))
return Mono
.justOrEmpty(parameter.getString("deviceId"))
.flatMapMany(deviceId -> {
int history = parameter.getInt("history").orElse(1);
//合并历史数据和实时数据
return fromHistory(deviceId, history);
return fromHistory(deviceId, history, getPropertiesFromParameter(parameter));
});
}
}
@ -175,16 +191,17 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
@Override
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
return Mono.justOrEmpty(parameter.getString("deviceId"))
return Mono
.justOrEmpty(parameter.getString("deviceId"))
.flatMapMany(deviceId -> {
int history = parameter.getInt("history").orElse(0);
//合并历史数据和实时数据
return Flux.concat(
//查询历史数据
fromHistory(deviceId, history)
fromHistory(deviceId, history, getPropertiesFromParameter(parameter))
,
//从消息网关订阅实时事件消息
fromRealTime(deviceId)
fromRealTime(deviceId, getPropertiesFromParameter(parameter))
);
});
}

View File

@ -23,18 +23,8 @@ import java.util.function.Function;
@Component
public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
private MeterRegistry registry;
Map<String, LongAdder> productCounts = new ConcurrentHashMap<>();
Function<String, LongAdder> counterAdder = productId ->
productCounts.computeIfAbsent(productId, __id -> {
LongAdder adder = new LongAdder();
Gauge.builder("online-count", adder, LongAdder::sum)
.tag("productId", __id)
.register(registry);
return adder;
});
private final MeterRegistry registry;
public DeviceStatusMeasurementProvider(MeterRegistryManager registryManager,
LocalDeviceInstanceService instanceService,
@ -47,14 +37,13 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
addMeasurement(new DeviceStatusRecordMeasurement(instanceService, timeSeriesManager));
registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId(),
"target", "msgType", "productId");
"target", "msgType", "productId");
}
@Subscribe("/device/*/*/online")
public Mono<Void> incrementOnline(DeviceMessage msg){
return Mono.fromRunnable(()->{
public Mono<Void> incrementOnline(DeviceMessage msg) {
return Mono.fromRunnable(() -> {
String productId = parseProductId(msg);
counterAdder.apply(productId).increment();
registry
.counter("online", "productId", productId)
.increment();
@ -62,10 +51,9 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
}
@Subscribe("/device/*/*/offline")
public Mono<Void> incrementOffline(DeviceMessage msg){
return Mono.fromRunnable(()->{
public Mono<Void> incrementOffline(DeviceMessage msg) {
return Mono.fromRunnable(() -> {
String productId = parseProductId(msg);
// counterAdder.apply(productId).decrement();
registry
.counter("offline", "productId", productId)
.increment();

View File

@ -81,22 +81,28 @@ class DeviceStatusRecordMeasurement
String format = parameter.getString("format").orElse("yyyy年MM月dd日");
DateTimeFormatter formatter = DateTimeFormat.forPattern(format);
return AggregationQueryParam.of()
return AggregationQueryParam
.of()
.max("value")
.filter(query ->
query.where("name", "online-count")
.is("productId", parameter.getString("productId").orElse(null))
query.where("name", "gateway-server-session")
)
.from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-30).atZone(ZoneId.systemDefault()).toInstant())))
.from(parameter
.getDate("from")
.orElse(Date.from(LocalDateTime
.now()
.plusDays(-30)
.atZone(ZoneId.systemDefault())
.toInstant())))
.to(parameter.getDate("to").orElse(new Date()))
.groupBy(parameter.getInterval("time").orElse(Interval.ofDays(1)),
parameter.getString("format").orElse("yyyy年MM月dd日"))
parameter.getString("format").orElse("yyyy年MM月dd日"))
.limit(parameter.getInt("limit").orElse(10))
.execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics())::aggregation)
.map(data -> {
long ts = data.getString("time")
.map(time -> DateTime.parse(time, formatter).getMillis())
.orElse(System.currentTimeMillis());
.map(time -> DateTime.parse(time, formatter).getMillis())
.orElse(System.currentTimeMillis());
return SimpleMeasurementValue.of(
data.get("value").orElse(0),
data.getString("time", ""),

View File

@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.jetlinks.community.device.service.data.StorageConstants.propertyIsIgnoreStorage;
import static org.jetlinks.community.device.service.data.StorageConstants.propertyIsJsonStringStorage;
@ -537,7 +538,29 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
.flatMap(product -> Mono.zip(Mono.just(product), product.getMetadata()));
}
//将毫秒转为纳秒努力让数据不重复
protected List<PropertyMetadata> getPropertyMetadata(DeviceMetadata metadata, String... properties) {
if (properties == null || properties.length == 0) {
return metadata.getProperties();
}
if (properties.length == 1) {
return metadata.getProperty(properties[0])
.map(Arrays::asList)
.orElseGet(Collections::emptyList);
}
Set<String> ids = new HashSet<>(Arrays.asList(properties));
return metadata
.getProperties()
.stream()
.filter(prop -> ids.isEmpty() || ids.contains(prop.getId()))
.collect(Collectors.toList());
}
/**
* 将毫秒转为纳秒努力让数据不重复
*
* @param millis 毫秒值
* @return 尽可能不会重复的long值
*/
protected long createUniqueNanoTime(long millis) {
long nano = TimeUnit.MILLISECONDS.toNanos(millis);

View File

@ -112,10 +112,11 @@ public class DefaultDeviceDataService implements DeviceDataService {
@Nonnull
@Override
public Flux<DeviceProperty> queryEachProperties(@Nonnull String deviceId,
@Nonnull QueryParamEntity query) {
@Nonnull QueryParamEntity query,
@Nonnull String... properties) {
return this
.getDeviceStrategy(deviceId)
.flatMapMany(strategy -> strategy.queryEachProperties(deviceId, query));
.flatMapMany(strategy -> strategy.queryEachProperties(deviceId, query,properties));
}
@Nonnull

View File

@ -98,7 +98,8 @@ public interface DeviceDataService {
*/
@Nonnull
Flux<DeviceProperty> queryEachProperties(@Nonnull String deviceId,
@Nonnull QueryParamEntity query);
@Nonnull QueryParamEntity query,
@Nonnull String... properties);
/**
* 查询指定的设备属性列表

View File

@ -127,7 +127,8 @@ public interface DeviceDataStoragePolicy {
*/
@Nonnull
Flux<DeviceProperty> queryEachProperties(@Nonnull String deviceId,
@Nonnull QueryParamEntity query);
@Nonnull QueryParamEntity query,
@Nonnull String... property);
/**
* 查询指定的设备属性列表

View File

@ -84,7 +84,8 @@ public class NoneDeviceDataStoragePolicy implements DeviceDataStoragePolicy {
@Nonnull
@Override
public Flux<DeviceProperty> queryEachProperties(@Nonnull String deviceId,
@Nonnull QueryParamEntity query) {
@Nonnull QueryParamEntity query,
@Nonnull String... property) {
return Flux.empty();
}

View File

@ -176,23 +176,19 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
@Nonnull
@Override
public Flux<DeviceProperty> queryEachProperties(@Nonnull String deviceId,
@Nonnull QueryParamEntity query) {
@Nonnull QueryParamEntity query,
@Nonnull String... property) {
return deviceRegistry
.getDevice(deviceId)
.flatMapMany(device -> Mono
.zip(device.getProduct(), device.getMetadata())
.flatMapMany(tp2 -> {
return this
.getProductAndMetadataByDevice(deviceId)
.flatMapMany(tp2 -> {
Map<String, PropertyMetadata> propertiesMap = tp2
.getT2()
.getProperties()
.stream()
.collect(Collectors.toMap(PropertyMetadata::getId, Function
.identity(), (a, b) -> a));
Map<String, PropertyMetadata> propertiesMap = getPropertyMetadata(tp2.getT2(), property)
.stream()
.collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a));
return queryEachDeviceProperty(tp2.getT1().getId(), deviceId, propertiesMap, query);
}));
return queryEachDeviceProperty(tp2.getT1().getId(), deviceId, propertiesMap, query);
});
}

View File

@ -169,33 +169,30 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
@Nonnull
@Override
public Flux<DeviceProperty> queryEachProperties(@Nonnull String deviceId,
@Nonnull QueryParamEntity query) {
@Nonnull QueryParamEntity query,
@Nonnull String... property) {
return deviceRegistry
.getDevice(deviceId)
.flatMapMany(device -> Mono
.zip(device.getProduct(), device.getMetadata())
.flatMapMany(tp2 -> {
return getProductAndMetadataByDevice(deviceId)
.flatMapMany(tp2 -> {
Map<String, PropertyMetadata> propertiesMap = tp2.getT2()
.getProperties()
.stream()
.collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a));
if (propertiesMap.isEmpty()) {
return Flux.empty();
}
return timeSeriesManager
.getService(devicePropertyMetric(tp2.getT1().getId()))
.aggregation(AggregationQueryParam
.of()
.agg(new LimitAggregationColumn("property", "property", Aggregation.TOP, query.getPageSize()))
.groupBy(new LimitGroup("property", "property", propertiesMap.size() * 2)) //按property分组
.filter(query)
.filter(q -> q.where("deviceId", deviceId))
).map(data -> DeviceProperty
.of(data, data.getString("property").map(propertiesMap::get).orElse(null))
.deviceId(deviceId));
}));
Map<String, PropertyMetadata> propertiesMap = getPropertyMetadata(tp2.getT2(), property)
.stream()
.collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a));
if (propertiesMap.isEmpty()) {
return Flux.empty();
}
return timeSeriesManager
.getService(devicePropertyMetric(tp2.getT1().getId()))
.aggregation(AggregationQueryParam
.of()
.agg(new LimitAggregationColumn("property", "property", Aggregation.TOP, query.getPageSize()))
.groupBy(new LimitGroup("property", "property", propertiesMap.size() * 2)) //按property分组
.filter(query)
.filter(q -> q.where("deviceId", deviceId).in("property", propertiesMap.keySet()))
).map(data -> DeviceProperty
.of(data, data.getString("property").map(propertiesMap::get).orElse(null))
.deviceId(deviceId));
});
}
protected String getTimeSeriesMetric(String productId) {

View File

@ -4,6 +4,8 @@ import org.hswebframework.web.authorization.Authentication;
import org.hswebframework.web.authorization.Dimension;
import org.hswebframework.web.authorization.Permission;
import org.hswebframework.web.authorization.events.AuthorizationSuccessEvent;
import org.hswebframework.web.authorization.exception.AccessDenyException;
import org.jetlinks.community.auth.service.UserDetailService;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@ -17,16 +19,26 @@ import java.util.stream.Collectors;
*/
@Component
public class LoginEvent {
private final UserDetailService detailService;
public LoginEvent(UserDetailService detailService) {
this.detailService = detailService;
}
@EventListener
public void handleLoginSuccess(AuthorizationSuccessEvent event){
public void handleLoginSuccess(AuthorizationSuccessEvent event) {
Map<String, Object> result = event.getResult();
Authentication authentication = event.getAuthentication();
List<Dimension> dimensions = authentication.getDimensions();
result.put("permissions",authentication.getPermissions());
result.put("roles",dimensions);
result.put("user",authentication.getUser());
result.put("currentAuthority",authentication.getPermissions().stream().map(Permission::getId).collect(Collectors.toList()));
result.put("permissions", authentication.getPermissions());
result.put("roles", dimensions);
result.put("currentAuthority", authentication.getPermissions().stream().map(Permission::getId).collect(Collectors.toList()));
event.async(
detailService
.findUserDetail(event.getAuthentication().getUser().getId())
.doOnNext(detail -> result.put("user", detail))
);
}
}

View File

@ -12,6 +12,8 @@ import org.hswebframework.web.authorization.token.redis.RedisUserTokenManager;
import org.jetlinks.community.device.entity.DeviceInstanceEntity;
import org.jetlinks.community.device.entity.DeviceProductEntity;
import org.jetlinks.community.device.service.AutoDiscoverDeviceRegistry;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.micrometer.MeterRegistryManager;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.config.ConfigStorageManager;
@ -145,8 +147,10 @@ public class JetLinksConfiguration {
}
@Bean
public GatewayServerMonitor gatewayServerMonitor(JetLinksProperties properties, MeterRegistry registry) {
GatewayServerMetrics metrics = new MicrometerGatewayServerMetrics(properties.getServerId(), registry);
public GatewayServerMonitor gatewayServerMonitor(JetLinksProperties properties, MeterRegistryManager registry) {
GatewayServerMetrics metrics = new MicrometerGatewayServerMetrics(properties.getServerId(),
registry.getMeterRegister(DeviceTimeSeriesMetric
.deviceMetrics().getId()));
return new GatewayServerMonitor() {
@Override