Compare commits
16 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
e395e4041e | |
|
|
e134dfec6f | |
|
|
3a45ac86da | |
|
|
f99b648b88 | |
|
|
d4645ad089 | |
|
|
88e0dd4667 | |
|
|
1bd136f3d2 | |
|
|
fc651ecd9d | |
|
|
dbceadc5e0 | |
|
|
520ca24fed | |
|
|
a6100d36f9 | |
|
|
80ed50211a | |
|
|
3a1f0b65de | |
|
|
2cfc78f1da | |
|
|
ad34196a2a | |
|
|
1e37c685cc |
|
|
@ -1,7 +1,7 @@
|
||||||
# JetLinks 物联网基础平台
|
# JetLinks 物联网基础平台
|
||||||
|
|
||||||

|

|
||||||

|

|
||||||
[](https://app.codacy.com/gh/jetlinks/jetlinks-community?utm_source=github.com&utm_medium=referral&utm_content=jetlinks/jetlinks-community&utm_campaign=Badge_Grade_Settings)
|
[](https://app.codacy.com/gh/jetlinks/jetlinks-community?utm_source=github.com&utm_medium=referral&utm_content=jetlinks/jetlinks-community&utm_campaign=Badge_Grade_Settings)
|
||||||
[](https://www.oscs1024.com/project/jetlinks/jetlinks-community?ref=badge_small)
|
[](https://www.oscs1024.com/project/jetlinks/jetlinks-community?ref=badge_small)
|
||||||
[](https://github.com/jetlinks/jetlinks-community)
|
[](https://github.com/jetlinks/jetlinks-community)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,24 @@
|
||||||
|
package org.jetlinks.community.configuration;
|
||||||
|
|
||||||
|
import org.jetlinks.community.resource.ui.UiMenuResourceProvider;
|
||||||
|
import org.jetlinks.community.resource.ui.UiResourceProvider;
|
||||||
|
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
|
||||||
|
@AutoConfiguration
|
||||||
|
@ConditionalOnProperty(prefix = "jetlinks.ui", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||||
|
public class UiResourceConfiguration {
|
||||||
|
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public UiResourceProvider uiResourceProvider() {
|
||||||
|
return new UiResourceProvider();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public UiMenuResourceProvider uiMenuResourceProvider() {
|
||||||
|
return new UiMenuResourceProvider();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,14 @@
|
||||||
|
package org.jetlinks.community.resource.ui;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.jetlinks.community.resource.ClassPathJsonResourceProvider;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class UiMenuResourceProvider extends ClassPathJsonResourceProvider {
|
||||||
|
public static final String TYPE = "ui-menus";
|
||||||
|
|
||||||
|
|
||||||
|
public UiMenuResourceProvider() {
|
||||||
|
super(TYPE, "classpath*:/ui/*/baseMenu.json");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,83 @@
|
||||||
|
package org.jetlinks.community.resource.ui;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
|
import lombok.SneakyThrows;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.jetlinks.community.resource.Resource;
|
||||||
|
import org.jetlinks.community.resource.ResourceProvider;
|
||||||
|
import org.jetlinks.community.resource.SimpleResource;
|
||||||
|
import org.jetlinks.community.utils.ObjectMappers;
|
||||||
|
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
|
||||||
|
import org.springframework.core.io.support.ResourcePatternResolver;
|
||||||
|
import org.springframework.util.StreamUtils;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class UiResourceProvider implements ResourceProvider {
|
||||||
|
public static final String TYPE = "ui";
|
||||||
|
|
||||||
|
private static final ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
|
||||||
|
|
||||||
|
private List<Resource> cache;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getType() {
|
||||||
|
return TYPE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Flux<Resource> getResources() {
|
||||||
|
return Flux.fromIterable(cache == null ? cache = read() : cache);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SneakyThrows
|
||||||
|
private List<Resource> read() {
|
||||||
|
List<Resource> resources = new ArrayList<>();
|
||||||
|
try {
|
||||||
|
for (org.springframework.core.io.Resource resource : resolver.getResources("classpath*:/ui/*/package.json")) {
|
||||||
|
try (InputStream stream = resource.getInputStream()) {
|
||||||
|
String s = StreamUtils.copyToString(stream, StandardCharsets.UTF_8);
|
||||||
|
Module m = ObjectMappers.parseJson(s, Module.class);
|
||||||
|
String path = resource.getURL().getPath();
|
||||||
|
String[] parts = path.split("/");
|
||||||
|
if (parts.length > 2) {
|
||||||
|
m.setPath(parts[parts.length - 3] + "/" + parts[parts.length - 2]);
|
||||||
|
resources.add(m.toResource());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
log.warn("load ui resource error", e);
|
||||||
|
}
|
||||||
|
return resources;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Flux<Resource> getResources(Collection<String> id) {
|
||||||
|
return Flux.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
public static class Module {
|
||||||
|
private String id;
|
||||||
|
private String name;
|
||||||
|
private String description;
|
||||||
|
private String path;
|
||||||
|
|
||||||
|
public SimpleResource toResource() {
|
||||||
|
id = StringUtils.isBlank(id) ? name : id;
|
||||||
|
return SimpleResource.of(id, TYPE, ObjectMappers.toJsonString(this));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -9,6 +9,7 @@ import org.hswebframework.web.authorization.exception.UnAuthorizedException;
|
||||||
import org.jetlinks.community.resource.Resource;
|
import org.jetlinks.community.resource.Resource;
|
||||||
import org.jetlinks.community.resource.ResourceManager;
|
import org.jetlinks.community.resource.ResourceManager;
|
||||||
import org.jetlinks.community.resource.TypeScriptDeclareResourceProvider;
|
import org.jetlinks.community.resource.TypeScriptDeclareResourceProvider;
|
||||||
|
import org.jetlinks.community.resource.ui.UiResourceProvider;
|
||||||
import org.springframework.web.bind.annotation.GetMapping;
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
import org.springframework.web.bind.annotation.PathVariable;
|
import org.springframework.web.bind.annotation.PathVariable;
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
|
@ -17,6 +18,7 @@ import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
@RequestMapping("/system/resources")
|
@RequestMapping("/system/resources")
|
||||||
|
|
@ -26,6 +28,15 @@ public class SystemResourcesController {
|
||||||
|
|
||||||
private final ResourceManager resourceManager;
|
private final ResourceManager resourceManager;
|
||||||
|
|
||||||
|
@GetMapping("/ui")
|
||||||
|
@SneakyThrows
|
||||||
|
@Authorize(merge = false)
|
||||||
|
public Flux<Object> getUIResources() {
|
||||||
|
return resourceManager
|
||||||
|
.getResources(UiResourceProvider.TYPE)
|
||||||
|
.map(resource->resource.as(HashMap.class));
|
||||||
|
}
|
||||||
|
|
||||||
@GetMapping("/{type}")
|
@GetMapping("/{type}")
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
public Flux<String> getResources(@PathVariable String type) {
|
public Flux<String> getResources(@PathVariable String type) {
|
||||||
|
|
|
||||||
|
|
@ -1 +1,2 @@
|
||||||
org.jetlinks.community.configuration.CommonConfiguration
|
org.jetlinks.community.configuration.CommonConfiguration
|
||||||
|
org.jetlinks.community.configuration.UiResourceConfiguration
|
||||||
|
|
@ -110,12 +110,12 @@ public class MqttClientProvider implements NetworkProvider<MqttClientProperties>
|
||||||
return new DefaultConfigMetadata()
|
return new DefaultConfigMetadata()
|
||||||
.add("id", "id", "", new StringType())
|
.add("id", "id", "", new StringType())
|
||||||
.add("remoteHost", "远程地址", "", new StringType())
|
.add("remoteHost", "远程地址", "", new StringType())
|
||||||
.add("remotePort", "远程地址", "", new IntType())
|
.add("remotePort", "远程端口", "", new IntType())
|
||||||
.add("certId", "证书id", "", new StringType())
|
.add("certId", "证书ID", "", new StringType())
|
||||||
.add("secure", "开启TSL", "", new BooleanType())
|
.add("secure", "开启TSL", "", new BooleanType())
|
||||||
.add("clientId", "客户端ID", "", new BooleanType())
|
.add("clientId", "客户端ID", "", new StringType())
|
||||||
.add("username", "用户名", "", new BooleanType())
|
.add("username", "用户名", "", new StringType())
|
||||||
.add("password", "密码", "", new BooleanType());
|
.add("password", "密码", "", new StringType());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nonnull
|
@Nonnull
|
||||||
|
|
|
||||||
|
|
@ -13,12 +13,12 @@ import org.jetlinks.community.network.mqtt.server.MqttConnection;
|
||||||
import org.jetlinks.community.network.mqtt.server.MqttPublishing;
|
import org.jetlinks.community.network.mqtt.server.MqttPublishing;
|
||||||
import org.jetlinks.community.network.mqtt.server.MqttServer;
|
import org.jetlinks.community.network.mqtt.server.MqttServer;
|
||||||
import org.jetlinks.community.utils.ObjectMappers;
|
import org.jetlinks.community.utils.ObjectMappers;
|
||||||
import org.jetlinks.community.utils.SystemUtils;
|
|
||||||
import org.jetlinks.core.ProtocolSupport;
|
import org.jetlinks.core.ProtocolSupport;
|
||||||
import org.jetlinks.core.device.*;
|
import org.jetlinks.core.device.*;
|
||||||
import org.jetlinks.core.device.session.DeviceSessionManager;
|
import org.jetlinks.core.device.session.DeviceSessionManager;
|
||||||
import org.jetlinks.core.message.DeviceMessage;
|
import org.jetlinks.core.message.DeviceMessage;
|
||||||
import org.jetlinks.core.message.codec.*;
|
import org.jetlinks.core.message.codec.*;
|
||||||
|
import org.jetlinks.core.server.DeviceGatewayContext;
|
||||||
import org.jetlinks.core.server.session.DeviceSession;
|
import org.jetlinks.core.server.session.DeviceSession;
|
||||||
import org.jetlinks.core.server.session.KeepOnlineSession;
|
import org.jetlinks.core.server.session.KeepOnlineSession;
|
||||||
import org.jetlinks.core.trace.FluxTracer;
|
import org.jetlinks.core.trace.FluxTracer;
|
||||||
|
|
@ -28,7 +28,6 @@ import org.springframework.util.StringUtils;
|
||||||
import reactor.core.Disposable;
|
import reactor.core.Disposable;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.scheduler.Schedulers;
|
|
||||||
import reactor.util.function.Tuple3;
|
import reactor.util.function.Tuple3;
|
||||||
import reactor.util.function.Tuples;
|
import reactor.util.function.Tuples;
|
||||||
|
|
||||||
|
|
@ -140,14 +139,14 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
|
||||||
|
|
||||||
//处理连接,并进行认证
|
//处理连接,并进行认证
|
||||||
private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection connection) {
|
private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection connection) {
|
||||||
|
|
||||||
return Mono
|
return Mono
|
||||||
.justOrEmpty(connection.getAuth())
|
.justOrEmpty(connection.getAuth())
|
||||||
.flatMap(auth -> {
|
.flatMap(auth -> {
|
||||||
MqttAuthenticationRequest request = new MqttAuthenticationRequest(connection.getClientId(),
|
MqttAuthenticationRequest request = new MqttAuthenticationRequest(
|
||||||
auth.getUsername(),
|
connection.getClientId(),
|
||||||
auth.getPassword(),
|
auth.getUsername(),
|
||||||
getTransport());
|
auth.getPassword(),
|
||||||
|
getTransport());
|
||||||
return supportMono
|
return supportMono
|
||||||
//使用自定义协议来认证
|
//使用自定义协议来认证
|
||||||
.map(support -> support.authenticate(request, registry))
|
.map(support -> support.authenticate(request, registry))
|
||||||
|
|
@ -161,7 +160,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
|
||||||
})
|
})
|
||||||
.flatMap(resp -> {
|
.flatMap(resp -> {
|
||||||
//认证响应可以自定义设备ID,如果没有则使用mqtt的clientId
|
//认证响应可以自定义设备ID,如果没有则使用mqtt的clientId
|
||||||
String deviceId = StringUtils.isEmpty(resp.getDeviceId()) ? connection.getClientId() : resp.getDeviceId();
|
String deviceId = StringUtils.hasText(resp.getDeviceId()) ? resp.getDeviceId() : connection.getClientId();
|
||||||
//认证返回了新的设备ID,则使用新的设备
|
//认证返回了新的设备ID,则使用新的设备
|
||||||
return registry
|
return registry
|
||||||
.getDevice(deviceId)
|
.getDevice(deviceId)
|
||||||
|
|
@ -171,7 +170,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
|
||||||
;
|
;
|
||||||
})
|
})
|
||||||
.as(MonoTracer
|
.as(MonoTracer
|
||||||
.create(SpanName.auth(connection.getClientId()),
|
.create(SpanName.auth0(connection.getClientId()),
|
||||||
(span, tp3) -> {
|
(span, tp3) -> {
|
||||||
AuthenticationResponse response = tp3.getT2();
|
AuthenticationResponse response = tp3.getT2();
|
||||||
if (!response.isSuccess()) {
|
if (!response.isSuccess()) {
|
||||||
|
|
@ -201,38 +200,14 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
|
||||||
}
|
}
|
||||||
|
|
||||||
//处理认证结果
|
//处理认证结果
|
||||||
private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> handleAuthResponse(DeviceOperator device,
|
private Mono<Tuple3<MqttConnection, DeviceOperator, DeviceSession>> handleAuthResponse(DeviceOperator device,
|
||||||
AuthenticationResponse resp,
|
AuthenticationResponse resp,
|
||||||
MqttConnection connection) {
|
MqttConnection connection) {
|
||||||
return Mono
|
return Mono
|
||||||
.defer(() -> {
|
.defer(() -> {
|
||||||
String deviceId = device.getDeviceId();
|
String deviceId = device.getDeviceId();
|
||||||
//认证通过
|
//认证通过
|
||||||
if (resp.isSuccess()) {
|
if (resp.isSuccess()) {
|
||||||
//监听断开连接
|
|
||||||
connection.onClose(conn -> {
|
|
||||||
counter.decrement();
|
|
||||||
//监控信息
|
|
||||||
monitor.disconnected();
|
|
||||||
monitor.totalConnection(counter.sum());
|
|
||||||
|
|
||||||
sessionManager
|
|
||||||
.getSession(deviceId, false)
|
|
||||||
.flatMap(_tmp -> {
|
|
||||||
//只有与创建的会话相同才移除(下线),因为有可能设置了keepOnline,
|
|
||||||
//或者设备通过其他方式注册了会话,这里断开连接不能影响到以上情况.
|
|
||||||
if (_tmp != null && _tmp.isWrapFrom(MqttConnectionSession.class) && !(_tmp instanceof KeepOnlineSession)) {
|
|
||||||
MqttConnectionSession connectionSession = _tmp.unwrap(MqttConnectionSession.class);
|
|
||||||
if (connectionSession.getConnection() == conn) {
|
|
||||||
return sessionManager.remove(deviceId, true);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Mono.empty();
|
|
||||||
})
|
|
||||||
.subscribe();
|
|
||||||
});
|
|
||||||
|
|
||||||
counter.increment();
|
|
||||||
return sessionManager
|
return sessionManager
|
||||||
.compute(deviceId, old -> {
|
.compute(deviceId, old -> {
|
||||||
MqttConnectionSession newSession = new MqttConnectionSession(deviceId, device, getTransport(), connection, monitor);
|
MqttConnectionSession newSession = new MqttConnectionSession(deviceId, device, getTransport(), connection, monitor);
|
||||||
|
|
@ -246,68 +221,100 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
|
||||||
})
|
})
|
||||||
.defaultIfEmpty(newSession);
|
.defaultIfEmpty(newSession);
|
||||||
})
|
})
|
||||||
.mapNotNull(session->{
|
.mapNotNull(session -> {
|
||||||
try {
|
try {
|
||||||
return Tuples.of(connection.accept(), device, session.unwrap(MqttConnectionSession.class));
|
return Tuples.of(connection.accept(), device, session);
|
||||||
} catch (IllegalStateException ignore) {
|
} catch (IllegalStateException ignore) {
|
||||||
//忽略错误,偶尔可能会出现网络异常,导致accept时,连接已经中断.还有其他更好的处理方式?
|
//忽略错误,偶尔可能会出现网络异常,导致accept时,连接已经中断.还有其他更好的处理方式?
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.doOnNext(o -> {
|
|
||||||
//监控信息
|
|
||||||
monitor.connected();
|
|
||||||
monitor.totalConnection(counter.sum());
|
|
||||||
})
|
|
||||||
//会话empty说明注册会话失败?
|
//会话empty说明注册会话失败?
|
||||||
.switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)));
|
.switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)));
|
||||||
} else {
|
} else {
|
||||||
//认证失败返回 0x04 BAD_USER_NAME_OR_PASSWORD
|
//认证失败返回 0x04 BAD_USER_NAME_OR_PASSWORD
|
||||||
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
|
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
|
||||||
monitor.rejected();
|
|
||||||
log.warn("MQTT客户端认证[{}]失败:{}", deviceId, resp.getMessage());
|
log.warn("MQTT客户端认证[{}]失败:{}", deviceId, resp.getMessage());
|
||||||
}
|
}
|
||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
})
|
})
|
||||||
.onErrorResume(error -> Mono.fromRunnable(() -> {
|
.onErrorResume(error -> Mono.fromRunnable(() -> {
|
||||||
log.error(error.getMessage(), error);
|
log.error(error.getMessage(), error);
|
||||||
monitor.rejected();
|
|
||||||
//发生错误时应答 SERVER_UNAVAILABLE
|
//发生错误时应答 SERVER_UNAVAILABLE
|
||||||
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
|
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
|
||||||
}))
|
}))
|
||||||
;
|
;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Mono<Void> handleClientConnect(MqttConnection connection,
|
||||||
|
DeviceOperator operator) {
|
||||||
|
return operator
|
||||||
|
.getProtocol()
|
||||||
|
.flatMap(supportMono -> supportMono
|
||||||
|
.onClientConnect(
|
||||||
|
DefaultTransport.MQTT,
|
||||||
|
connection,
|
||||||
|
new DeviceGatewayContext() {
|
||||||
|
@Override
|
||||||
|
public Mono<DeviceOperator> getDevice(String deviceId) {
|
||||||
|
return registry.getDevice(deviceId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<DeviceProductOperator> getProduct(String productId) {
|
||||||
|
return registry.getProduct(productId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<Void> onMessage(DeviceMessage message) {
|
||||||
|
return handleMessage(operator, message, connection)
|
||||||
|
.then();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
//处理已经建立连接的MQTT连接
|
//处理已经建立连接的MQTT连接
|
||||||
private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection,
|
private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection,
|
||||||
DeviceOperator operator,
|
DeviceOperator operator,
|
||||||
MqttConnectionSession session) {
|
DeviceSession session) {
|
||||||
return Flux
|
|
||||||
.usingWhen(Mono.just(connection),
|
|
||||||
MqttConnection::handleMessage,
|
return this
|
||||||
MqttConnection::close)
|
.handleClientConnect(connection, operator)
|
||||||
//网关暂停或者已停止时,则不处理消息
|
.thenMany(Flux.usingWhen(Mono.just(connection),
|
||||||
.filter(pb -> isStarted())
|
MqttConnection::handleMessage,
|
||||||
.publishOn(Schedulers.parallel())
|
MqttConnection::close))
|
||||||
//解码收到的mqtt报文
|
//解码收到的mqtt报文
|
||||||
.concatMap(publishing -> this
|
.concatMap(
|
||||||
.decodeAndHandleMessage(operator, session, publishing, connection)
|
publishing -> {
|
||||||
.as(MonoTracer
|
if (!isStarted()) {
|
||||||
.create(SpanName.upstream(connection.getClientId()),
|
return Mono.empty();
|
||||||
(span) -> span.setAttribute(SpanKey.message, publishing.print())))
|
}
|
||||||
)
|
return this
|
||||||
//合并遗言消息
|
.decodeAndHandleMessage(operator, session, publishing, connection)
|
||||||
.mergeWith(
|
.as(MonoTracer
|
||||||
Mono.justOrEmpty(connection.getWillMessage())
|
.create(SpanName.upstream0(connection.getClientId()),
|
||||||
//解码遗言消息
|
(span) -> span.setAttributeLazy(SpanKey.message, publishing::print)));
|
||||||
.flatMap(mqttMessage -> this.decodeAndHandleMessage(operator, session, mqttMessage, connection))
|
},
|
||||||
|
0
|
||||||
)
|
)
|
||||||
|
.as(flux -> {
|
||||||
|
MqttMessage will = connection.getWillMessage().orElse(null);
|
||||||
|
if (will != null) {
|
||||||
|
//合并遗言消息
|
||||||
|
return flux.mergeWith(
|
||||||
|
this.decodeAndHandleMessage(operator, session, will, connection)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return flux;
|
||||||
|
})
|
||||||
.then();
|
.then();
|
||||||
}
|
}
|
||||||
|
|
||||||
//解码消息并处理
|
//解码消息并处理
|
||||||
private Mono<Void> decodeAndHandleMessage(DeviceOperator operator,
|
private Mono<Void> decodeAndHandleMessage(DeviceOperator operator,
|
||||||
MqttConnectionSession session,
|
DeviceSession session,
|
||||||
MqttMessage message,
|
MqttMessage message,
|
||||||
MqttConnection connection) {
|
MqttConnection connection) {
|
||||||
monitor.receivedMessage();
|
monitor.receivedMessage();
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,7 @@ import org.hswebframework.web.exception.BusinessException;
|
||||||
import org.jetlinks.community.tdengine.TDEngineUtils;
|
import org.jetlinks.community.tdengine.TDEngineUtils;
|
||||||
import org.jetlinks.community.tdengine.term.TDengineQueryConditionBuilder;
|
import org.jetlinks.community.tdengine.term.TDengineQueryConditionBuilder;
|
||||||
import org.jetlinks.community.things.data.ThingsDataConstants;
|
import org.jetlinks.community.things.data.ThingsDataConstants;
|
||||||
|
import org.jetlinks.community.things.utils.ThingsDatabaseUtils;
|
||||||
import org.jetlinks.core.metadata.Converter;
|
import org.jetlinks.core.metadata.Converter;
|
||||||
import org.jetlinks.core.metadata.DataType;
|
import org.jetlinks.core.metadata.DataType;
|
||||||
import org.jetlinks.community.Interval;
|
import org.jetlinks.community.Interval;
|
||||||
|
|
@ -100,11 +101,7 @@ class TDengineThingDataHelper implements Disposable {
|
||||||
.getColumn(metric, term.getColumn())
|
.getColumn(metric, term.getColumn())
|
||||||
.ifPresent(meta -> {
|
.ifPresent(meta -> {
|
||||||
DataType type = meta.getValueType();
|
DataType type = meta.getValueType();
|
||||||
if (isArrayTerm(type, term)) {
|
ThingsDatabaseUtils.tryConvertTermValue(type, term);
|
||||||
term.setValue(tryConvertList(type, term));
|
|
||||||
} else if (type instanceof Converter) {
|
|
||||||
term.setValue(((Converter<?>) type).convert(term.getValue()));
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,8 @@ import org.hswebframework.ezorm.rdb.codec.NumberValueCodec;
|
||||||
import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
|
import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
|
||||||
import org.jetlinks.community.ConfigMetadataConstants;
|
import org.jetlinks.community.ConfigMetadataConstants;
|
||||||
import org.jetlinks.community.utils.ConverterUtils;
|
import org.jetlinks.community.utils.ConverterUtils;
|
||||||
|
import org.jetlinks.community.utils.TimeUtils;
|
||||||
|
import org.jetlinks.core.metadata.Converter;
|
||||||
import org.jetlinks.core.metadata.DataType;
|
import org.jetlinks.core.metadata.DataType;
|
||||||
import org.jetlinks.core.metadata.PropertyMetadata;
|
import org.jetlinks.core.metadata.PropertyMetadata;
|
||||||
import org.jetlinks.core.metadata.types.*;
|
import org.jetlinks.core.metadata.types.*;
|
||||||
|
|
@ -208,6 +210,33 @@ public class ThingsDatabaseUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void tryConvertTermValue(DataType type,
|
||||||
|
Term term,
|
||||||
|
BiFunction<DataType, Object, Object> tryConvertTermValue) {
|
||||||
|
tryConvertTermValue(type,
|
||||||
|
term,
|
||||||
|
ThingsDatabaseUtils::isDoNotConvertValue,
|
||||||
|
ThingsDatabaseUtils::maybeList,
|
||||||
|
tryConvertTermValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void tryConvertTermValue(DataType type,
|
||||||
|
Term term) {
|
||||||
|
tryConvertTermValue(type,
|
||||||
|
term,
|
||||||
|
ThingsDatabaseUtils::tryConvertTermValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Object tryConvertTermValue(DataType type, Object value) {
|
||||||
|
if (type instanceof DateTimeType) {
|
||||||
|
return TimeUtils.convertToDate(value).getTime();
|
||||||
|
} else if (type instanceof Converter) {
|
||||||
|
return ((Converter<?>) type).convert(value);
|
||||||
|
}
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static void tryConvertTermValue(DataType type,
|
public static void tryConvertTermValue(DataType type,
|
||||||
Term term,
|
Term term,
|
||||||
BiPredicate<DataType, Term> isDoNotConvertValue,
|
BiPredicate<DataType, Term> isDoNotConvertValue,
|
||||||
|
|
|
||||||
|
|
@ -127,24 +127,27 @@ public class DeviceTagEntity extends GenericEntity<String> {
|
||||||
return tag;
|
return tag;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public DeviceProperty toProperty() {
|
public DeviceProperty toProperty() {
|
||||||
DeviceProperty property = new DeviceProperty();
|
DeviceProperty property = new DeviceProperty();
|
||||||
property.setProperty(getKey());
|
property.setProperty(getKey());
|
||||||
property.setDeviceId(deviceId);
|
property.setDeviceId(deviceId);
|
||||||
property.setType(type);
|
property.setType(type);
|
||||||
property.setPropertyName(name);
|
property.setPropertyName(name);
|
||||||
|
property.setValue(parseValue());
|
||||||
|
return property;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Object parseValue() {
|
||||||
DataType type = Optional
|
DataType type = Optional
|
||||||
.ofNullable(DataTypes.lookup(getType()))
|
.ofNullable(DataTypes.lookup(getType()))
|
||||||
.map(Supplier::get)
|
.map(Supplier::get)
|
||||||
.orElseGet(UnknownType::new);
|
.orElseGet(UnknownType::new);
|
||||||
if (type instanceof Converter) {
|
if (type instanceof Converter) {
|
||||||
property.setValue(((Converter<?>) type).convert(getValue()));
|
return ((Converter<?>) type).convert(getValue());
|
||||||
} else {
|
} else {
|
||||||
property.setValue(getValue());
|
return getValue();
|
||||||
}
|
}
|
||||||
return property;
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//以物模型标签基础数据为准,重构数据库保存的可能已过时的标签数据
|
//以物模型标签基础数据为准,重构数据库保存的可能已过时的标签数据
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,18 @@
|
||||||
|
package org.jetlinks.community.device.service.tag;
|
||||||
|
|
||||||
|
import org.jetlinks.community.buffer.BufferProperties;
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@ConfigurationProperties(prefix = "device.tag.synchronizer")
|
||||||
|
public class DeviceTagProperties extends BufferProperties {
|
||||||
|
|
||||||
|
public DeviceTagProperties(){
|
||||||
|
setFilePath("./data/device-tag-synchronizer");
|
||||||
|
setSize(500);
|
||||||
|
setParallelism(1);
|
||||||
|
getEviction().setMaxSize(100_0000);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,224 @@
|
||||||
|
package org.jetlinks.community.device.service.tag;
|
||||||
|
|
||||||
|
import lombok.*;
|
||||||
|
import org.apache.commons.collections4.MapUtils;
|
||||||
|
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
|
||||||
|
import org.hswebframework.web.crud.events.EntityCreatedEvent;
|
||||||
|
import org.hswebframework.web.crud.events.EntityDeletedEvent;
|
||||||
|
import org.hswebframework.web.crud.events.EntityModifyEvent;
|
||||||
|
import org.hswebframework.web.crud.events.EntitySavedEvent;
|
||||||
|
import org.jetlinks.core.device.DeviceOperator;
|
||||||
|
import org.jetlinks.core.device.DeviceRegistry;
|
||||||
|
import org.jetlinks.core.device.DeviceThingType;
|
||||||
|
import org.jetlinks.core.message.UpdateTagMessage;
|
||||||
|
import org.jetlinks.core.things.ThingsDataManager;
|
||||||
|
import org.jetlinks.core.utils.Reactors;
|
||||||
|
import org.jetlinks.community.buffer.BufferSettings;
|
||||||
|
import org.jetlinks.community.buffer.PersistenceBuffer;
|
||||||
|
import org.jetlinks.community.device.entity.DeviceTagEntity;
|
||||||
|
import org.jetlinks.community.gateway.annotation.Subscribe;
|
||||||
|
import org.jetlinks.community.things.data.ThingsDataWriter;
|
||||||
|
import org.springframework.boot.CommandLineRunner;
|
||||||
|
import org.springframework.boot.SpringApplication;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
|
import javax.annotation.PostConstruct;
|
||||||
|
import javax.annotation.PreDestroy;
|
||||||
|
import java.io.Externalizable;
|
||||||
|
import java.io.ObjectInput;
|
||||||
|
import java.io.ObjectOutput;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class DeviceTagSynchronizer implements CommandLineRunner {
|
||||||
|
|
||||||
|
private final DeviceTagProperties properties;
|
||||||
|
|
||||||
|
private final DeviceRegistry registry;
|
||||||
|
|
||||||
|
private final ThingsDataWriter dataWriter;
|
||||||
|
|
||||||
|
private final ThingsDataManager dataManager;
|
||||||
|
|
||||||
|
private final ReactiveRepository<DeviceTagEntity, String> tagRepository;
|
||||||
|
|
||||||
|
public PersistenceBuffer<DeviceTagBuffer> buffer;
|
||||||
|
|
||||||
|
@Subscribe(value = "/device/*/*/message/tags/update")
|
||||||
|
public Mono<Void> updateDeviceTag(UpdateTagMessage message) {
|
||||||
|
Map<String, Object> tags = message.getTags();
|
||||||
|
if (MapUtils.isEmpty(tags)) {
|
||||||
|
return Mono.empty();
|
||||||
|
}
|
||||||
|
String deviceId = message.getDeviceId();
|
||||||
|
|
||||||
|
return registry
|
||||||
|
.getDevice(deviceId)
|
||||||
|
.flatMap(DeviceOperator::getMetadata)
|
||||||
|
.flatMapMany(metadata -> Flux
|
||||||
|
.fromIterable(tags.entrySet())
|
||||||
|
.filter(e -> e.getValue() != null)
|
||||||
|
.flatMap(e -> {
|
||||||
|
DeviceTagEntity tagEntity = metadata
|
||||||
|
.getTag(e.getKey())
|
||||||
|
.map(tagMeta -> DeviceTagEntity.of(tagMeta, e.getValue()))
|
||||||
|
.orElseGet(() -> {
|
||||||
|
DeviceTagEntity entity = new DeviceTagEntity();
|
||||||
|
entity.setKey(e.getKey());
|
||||||
|
entity.setType("string");
|
||||||
|
entity.setName(e.getKey());
|
||||||
|
entity.setCreateTime(new Date());
|
||||||
|
entity.setDescription("设备上报");
|
||||||
|
entity.setValue(String.valueOf(e.getValue()));
|
||||||
|
return entity;
|
||||||
|
});
|
||||||
|
tagEntity.setTimestamp(message.getTimestamp());
|
||||||
|
tagEntity.setDeviceId(deviceId);
|
||||||
|
tagEntity.setId(DeviceTagEntity.createTagId(deviceId, tagEntity.getKey()));
|
||||||
|
|
||||||
|
return dataWriter
|
||||||
|
.updateTag(DeviceThingType.device.getId(),
|
||||||
|
tagEntity.getDeviceId(),
|
||||||
|
tagEntity.getKey(),
|
||||||
|
System.currentTimeMillis(),
|
||||||
|
e.getValue())
|
||||||
|
.then(writeBuffer(tagEntity));
|
||||||
|
|
||||||
|
}))
|
||||||
|
.then();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public Mono<Void> writeBuffer(DeviceTagEntity entity) {
|
||||||
|
return buffer.writeAsync(new DeviceTagBuffer(entity));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private Mono<DeviceTagEntity> convertEntity(DeviceTagBuffer buffer) {
|
||||||
|
//从最新缓存中获取最新的数据,并填入准备入库的实体中
|
||||||
|
return dataManager
|
||||||
|
.getLastTag(DeviceThingType.device.getId(),
|
||||||
|
buffer.getTag().getDeviceId(),
|
||||||
|
buffer.getTag().getKey(),
|
||||||
|
System.currentTimeMillis())
|
||||||
|
.map(tag -> {
|
||||||
|
//缓存中的数据比buffer中的新,则更新为buffer中的数据
|
||||||
|
if (tag.getTimestamp() >= buffer.tag.getTimestamp()) {
|
||||||
|
buffer.getTag().setTimestamp(tag.getTimestamp());
|
||||||
|
buffer.getTag().setValue(String.valueOf(tag.getValue()));
|
||||||
|
}
|
||||||
|
return buffer.getTag();
|
||||||
|
})
|
||||||
|
.defaultIfEmpty(buffer.tag);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Mono<Boolean> handleBuffer(Flux<DeviceTagBuffer> buffer) {
|
||||||
|
|
||||||
|
return tagRepository
|
||||||
|
.save(buffer.flatMap(this::convertEntity))
|
||||||
|
.contextWrite(ctx -> ctx.put(DeviceTagSynchronizer.class, this))
|
||||||
|
.then(Reactors.ALWAYS_FALSE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@EventListener
|
||||||
|
public void handleDeviceTagEvent(EntityCreatedEvent<DeviceTagEntity> event) {
|
||||||
|
event.async(updateTag(event.getEntity()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@EventListener
|
||||||
|
public void handleDeviceTagEvent(EntitySavedEvent<DeviceTagEntity> event) {
|
||||||
|
event.async(updateTag(event.getEntity()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@EventListener
|
||||||
|
public void handleDeviceTagEvent(EntityModifyEvent<DeviceTagEntity> event) {
|
||||||
|
event.async(updateTag(event.getAfter()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@EventListener
|
||||||
|
public void handleDeviceTagEvent(EntityDeletedEvent<DeviceTagEntity> event) {
|
||||||
|
event.async(
|
||||||
|
Flux
|
||||||
|
.fromIterable(event.getEntity())
|
||||||
|
.flatMap(entity -> dataWriter
|
||||||
|
.removeTag(DeviceThingType.device.getId(),
|
||||||
|
entity.getDeviceId(),
|
||||||
|
entity.getKey())
|
||||||
|
.then()
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 更新标签,界面上手动修改标签?
|
||||||
|
*
|
||||||
|
* @param entityList 标签
|
||||||
|
* @return Void
|
||||||
|
*/
|
||||||
|
private Mono<Void> updateTag(List<DeviceTagEntity> entityList) {
|
||||||
|
return Mono.deferContextual(ctx -> {
|
||||||
|
//更新来自消息的标签,不需要再次更新
|
||||||
|
if (ctx.hasKey(DeviceTagSynchronizer.class)) {
|
||||||
|
return Mono.empty();
|
||||||
|
}
|
||||||
|
return Flux
|
||||||
|
.fromIterable(entityList)
|
||||||
|
.flatMap(entity -> dataWriter
|
||||||
|
.updateTag(DeviceThingType.device.getId(),
|
||||||
|
entity.getDeviceId(),
|
||||||
|
entity.getKey(),
|
||||||
|
System.currentTimeMillis(),
|
||||||
|
entity.parseValue()))
|
||||||
|
.then();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init() {
|
||||||
|
buffer = new PersistenceBuffer<>(
|
||||||
|
BufferSettings.create(properties),
|
||||||
|
DeviceTagBuffer::new,
|
||||||
|
this::handleBuffer)
|
||||||
|
.name("device-tag-synchronizer");
|
||||||
|
buffer.init();
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void shutdown() {
|
||||||
|
buffer.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run(String... args) throws Exception {
|
||||||
|
buffer.start();
|
||||||
|
SpringApplication
|
||||||
|
.getShutdownHandlers()
|
||||||
|
.add(buffer::dispose);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
public static class DeviceTagBuffer implements Externalizable {
|
||||||
|
private DeviceTagEntity tag;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeExternal(ObjectOutput out) {
|
||||||
|
tag.writeExternal(out);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readExternal(ObjectInput in) {
|
||||||
|
tag = new DeviceTagEntity();
|
||||||
|
tag.readExternal(in);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -2,6 +2,7 @@ package org.jetlinks.community.rule.engine.alarm;
|
||||||
|
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.hswebframework.web.bean.FastBeanCopier;
|
import org.hswebframework.web.bean.FastBeanCopier;
|
||||||
import org.hswebframework.web.i18n.LocaleUtils;
|
import org.hswebframework.web.i18n.LocaleUtils;
|
||||||
|
|
@ -471,6 +472,8 @@ public class DefaultAlarmHandler implements AlarmHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
public static class TriggerCache implements Externalizable {
|
public static class TriggerCache implements Externalizable {
|
||||||
|
|
||||||
static final byte stateNormal = 0x01;
|
static final byte stateNormal = 0x01;
|
||||||
|
|
@ -530,6 +533,8 @@ public class DefaultAlarmHandler implements AlarmHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
public static class RelieveCache implements Externalizable {
|
public static class RelieveCache implements Externalizable {
|
||||||
private long reliveTime;
|
private long reliveTime;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -434,6 +434,8 @@ public class DefaultAlarmRuleHandler implements AlarmRuleHandler, CommandLineRun
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
public static class RecordCache implements Externalizable {
|
public static class RecordCache implements Externalizable {
|
||||||
|
|
||||||
static final byte stateNormal = 0x01;
|
static final byte stateNormal = 0x01;
|
||||||
|
|
|
||||||
|
|
@ -26,18 +26,18 @@ public class AlarmRecordMeasurementProvider extends StaticMeasurementProvider {
|
||||||
TimeSeriesManager timeSeriesManager) {
|
TimeSeriesManager timeSeriesManager) {
|
||||||
super(AlarmDashboardDefinition.alarm, AlarmObjectDefinition.record);
|
super(AlarmDashboardDefinition.alarm, AlarmObjectDefinition.record);
|
||||||
|
|
||||||
registry = registryManager.getMeterRegister(AlarmTimeSeriesMetric.alarmStreamMetrics().getId());
|
// registry = registryManager.getMeterRegister(AlarmTimeSeriesMetric.alarmStreamMetrics().getId());
|
||||||
addMeasurement(new AlarmRecordTrendMeasurement(timeSeriesManager));
|
addMeasurement(new AlarmRecordTrendMeasurement(timeSeriesManager));
|
||||||
addMeasurement(new AlarmRecordRankMeasurement(timeSeriesManager));
|
addMeasurement(new AlarmRecordRankMeasurement(timeSeriesManager));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@EventListener
|
// @EventListener
|
||||||
public void aggAlarmRecord(AlarmHistoryInfo info) {
|
// public void aggAlarmRecord(AlarmHistoryInfo info) {
|
||||||
registry
|
// registry
|
||||||
.counter("record-agg", getTags(info))
|
// .counter("record-agg", getTags(info))
|
||||||
.increment();
|
// .increment();
|
||||||
}
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -219,15 +219,32 @@ public class SceneUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void refactorUpperKey(DeviceSelectorSpec deviceSelectorSpec) {
|
@SuppressWarnings("all")
|
||||||
|
public static void refactorUpperKey(Object source) {
|
||||||
// 将变量格式改为与查询的别名一致
|
// 将变量格式改为与查询的别名一致
|
||||||
if (VariableSource.Source.upper.equals(deviceSelectorSpec.getSource())) {
|
if (source instanceof VariableSource) {
|
||||||
// scene.xx.current -> scene.scene_xx_current
|
VariableSource variableSource = (VariableSource) source;
|
||||||
if (deviceSelectorSpec.getUpperKey().startsWith("scene.")) {
|
if (VariableSource.Source.upper.equals(variableSource.getSource())) {
|
||||||
String alias = SceneUtils.createColumnAlias("properties", deviceSelectorSpec.getUpperKey(), false);
|
variableSource.setUpperKey(transferSceneUpperKey(variableSource.getUpperKey()));
|
||||||
deviceSelectorSpec.setUpperKey("scene." + alias);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (source instanceof Map) {
|
||||||
|
Map<String, Object> map = (Map<String, Object>) source;
|
||||||
|
VariableSource variableSource = VariableSource.of(source);
|
||||||
|
// 将变量格式改为与查询的别名一致
|
||||||
|
if (VariableSource.Source.upper.equals(variableSource.getSource())) {
|
||||||
|
map.put("upperKey", transferSceneUpperKey(variableSource.getUpperKey()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String transferSceneUpperKey(String upperKey) {
|
||||||
|
// scene.xx.current -> scene.scene_xx_current
|
||||||
|
if (upperKey.startsWith("scene.")) {
|
||||||
|
String alias = SceneUtils.createColumnAlias("scene", upperKey, false);
|
||||||
|
return "scene." + alias;
|
||||||
|
}
|
||||||
|
return upperKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean isContainThis(String[] arr) {
|
private static boolean isContainThis(String[] arr) {
|
||||||
|
|
|
||||||
|
|
@ -148,11 +148,13 @@ public class Variable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void refactorPrefix(Variable main) {
|
public void refactorPrefix(Variable main) {
|
||||||
|
id = SceneUtils.transferSceneUpperKey(id);
|
||||||
if (CollectionUtils.isNotEmpty(children)) {
|
if (CollectionUtils.isNotEmpty(children)) {
|
||||||
for (Variable child : children) {
|
for (Variable child : children) {
|
||||||
if (!child.getId().startsWith(main.id + ".")) {
|
if (!child.getId().startsWith(main.id + ".")) {
|
||||||
child.setId(main.id + "." + child.getId());
|
child.setId(main.id + "." + child.getId());
|
||||||
}
|
}
|
||||||
|
child.setId(SceneUtils.transferSceneUpperKey(child.getId()));
|
||||||
|
|
||||||
if (StringUtils.hasText(child.getFullName()) && StringUtils.hasText(main.getFullName())) {
|
if (StringUtils.hasText(child.getFullName()) && StringUtils.hasText(main.getFullName())) {
|
||||||
child.setFullName(main.getFullName() + "/" + child.getFullName());
|
child.setFullName(main.getFullName() + "/" + child.getFullName());
|
||||||
|
|
|
||||||
8
pom.xml
8
pom.xml
|
|
@ -22,15 +22,15 @@
|
||||||
<project.build.jdk>${java.version}</project.build.jdk>
|
<project.build.jdk>${java.version}</project.build.jdk>
|
||||||
<!-- 基础通用模块依赖,快照版本表示正在持续迭代.发布后将同步到maven中央仓库 -->
|
<!-- 基础通用模块依赖,快照版本表示正在持续迭代.发布后将同步到maven中央仓库 -->
|
||||||
<!-- https://github.com/hs-web/hsweb-framework -->
|
<!-- https://github.com/hs-web/hsweb-framework -->
|
||||||
<hsweb.framework.version>4.0.18</hsweb.framework.version>
|
<hsweb.framework.version>4.0.20-SNAPSHOT</hsweb.framework.version>
|
||||||
<!-- https://github.com/hs-web/hsweb-easy-orm -->
|
<!-- https://github.com/hs-web/hsweb-easy-orm -->
|
||||||
<easyorm.version>4.1.3</easyorm.version>
|
<easyorm.version>4.1.5-SNAPSHOT</easyorm.version>
|
||||||
<!-- https://github.com/jetlinks/jetlinks -->
|
<!-- https://github.com/jetlinks/jetlinks -->
|
||||||
<jetlinks.version>1.2.4-SNAPSHOT</jetlinks.version>
|
<jetlinks.version>1.2.5-SNAPSHOT</jetlinks.version>
|
||||||
<!-- https://github.com/hs-web/reactor-excel -->
|
<!-- https://github.com/hs-web/reactor-excel -->
|
||||||
<reactor.excel.version>1.0.6</reactor.excel.version>
|
<reactor.excel.version>1.0.6</reactor.excel.version>
|
||||||
<!-- https://github.com/jetlinks/reactor-ql -->
|
<!-- https://github.com/jetlinks/reactor-ql -->
|
||||||
<reactor.ql.version>1.0.18</reactor.ql.version>
|
<reactor.ql.version>1.0.19</reactor.ql.version>
|
||||||
<!-- https://github.com/jetlinks/jetlinks-plugin -->
|
<!-- https://github.com/jetlinks/jetlinks-plugin -->
|
||||||
<jetlinks.plugin.version>1.0.3</jetlinks.plugin.version>
|
<jetlinks.plugin.version>1.0.3</jetlinks.plugin.version>
|
||||||
<!-- https://github.com/jetlinks/jetlinks-sdk -->
|
<!-- https://github.com/jetlinks/jetlinks-sdk -->
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue