Compare commits

...

16 Commits

Author SHA1 Message Date
zhouhao e395e4041e build: 升级依赖 2025-07-23 10:06:47 +08:00
zhouhao e134dfec6f build: 升级依赖版本 2025-06-30 13:54:01 +08:00
zhouhao 3a45ac86da refactor: 升级依赖 2025-06-19 16:05:08 +08:00
zhouhao f99b648b88 refactor: 优化兼容性 2025-06-12 14:05:48 +08:00
zhouhao d4645ad089 refactor: 优化查询条件转换 2025-06-03 19:14:08 +08:00
zhouhao 88e0dd4667 Merge remote-tracking branch 'origin/master' 2025-05-27 10:41:27 +08:00
zhouhao 1bd136f3d2 refactor: 移除无用统计 2025-05-27 10:41:14 +08:00
Zhang Ji fc651ecd9d
fix(场景联动): 修复执行动作的内置参数获取可能失效的问题 (#636) 2025-05-26 19:03:49 +08:00
zhouhao dbceadc5e0 Merge remote-tracking branch 'origin/master' 2025-05-20 15:32:09 +08:00
zhouhao 520ca24fed refactor: 增加ui resource接口 2025-05-20 15:31:49 +08:00
laokou a6100d36f9
fix: 修复MqttClient网络组件定义的元数据名称和类型错误 (#632) 2025-05-06 09:25:11 +08:00
zhouhao 80ed50211a refactor: 优化标签存储 2025-04-29 18:11:00 +08:00
zhouhao 3a1f0b65de fix: 修复设备标签无法更新问题 2025-04-28 18:33:07 +08:00
zhouhao 2cfc78f1da Merge remote-tracking branch 'origin/master' 2025-04-28 11:41:26 +08:00
zhouhao ad34196a2a refactor: mqtt服务接入支持onClientConnect 2025-04-28 11:41:08 +08:00
老周 1e37c685cc
Update README.md 2025-04-27 18:12:06 +08:00
19 changed files with 536 additions and 99 deletions

View File

@ -1,7 +1,7 @@
# JetLinks 物联网基础平台
![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/jetlinks/jetlinks-community/maven.yml?branch=master)
![Version](https://img.shields.io/badge/version-2.2-brightgreen)
![Version](https://img.shields.io/badge/version-2.3-brightgreen)
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/e8d527d692c24633aba4f869c1c5d6ad)](https://app.codacy.com/gh/jetlinks/jetlinks-community?utm_source=github.com&utm_medium=referral&utm_content=jetlinks/jetlinks-community&utm_campaign=Badge_Grade_Settings)
[![OSCS Status](https://www.oscs1024.com/platform/badge/jetlinks/jetlinks-community.svg?size=small)](https://www.oscs1024.com/project/jetlinks/jetlinks-community?ref=badge_small)
[![star](https://img.shields.io/github/stars/jetlinks/jetlinks-community?style=social)](https://github.com/jetlinks/jetlinks-community)

View File

@ -0,0 +1,24 @@
package org.jetlinks.community.configuration;
import org.jetlinks.community.resource.ui.UiMenuResourceProvider;
import org.jetlinks.community.resource.ui.UiResourceProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
@AutoConfiguration
@ConditionalOnProperty(prefix = "jetlinks.ui", name = "enabled", havingValue = "true", matchIfMissing = true)
public class UiResourceConfiguration {
@Bean
public UiResourceProvider uiResourceProvider() {
return new UiResourceProvider();
}
@Bean
public UiMenuResourceProvider uiMenuResourceProvider() {
return new UiMenuResourceProvider();
}
}

View File

@ -0,0 +1,14 @@
package org.jetlinks.community.resource.ui;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.resource.ClassPathJsonResourceProvider;
@Slf4j
public class UiMenuResourceProvider extends ClassPathJsonResourceProvider {
public static final String TYPE = "ui-menus";
public UiMenuResourceProvider() {
super(TYPE, "classpath*:/ui/*/baseMenu.json");
}
}

View File

@ -0,0 +1,83 @@
package org.jetlinks.community.resource.ui;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jetlinks.community.resource.Resource;
import org.jetlinks.community.resource.ResourceProvider;
import org.jetlinks.community.resource.SimpleResource;
import org.jetlinks.community.utils.ObjectMappers;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.util.StreamUtils;
import reactor.core.publisher.Flux;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@Slf4j
public class UiResourceProvider implements ResourceProvider {
public static final String TYPE = "ui";
private static final ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
private List<Resource> cache;
@Override
public String getType() {
return TYPE;
}
@Override
public Flux<Resource> getResources() {
return Flux.fromIterable(cache == null ? cache = read() : cache);
}
@SneakyThrows
private List<Resource> read() {
List<Resource> resources = new ArrayList<>();
try {
for (org.springframework.core.io.Resource resource : resolver.getResources("classpath*:/ui/*/package.json")) {
try (InputStream stream = resource.getInputStream()) {
String s = StreamUtils.copyToString(stream, StandardCharsets.UTF_8);
Module m = ObjectMappers.parseJson(s, Module.class);
String path = resource.getURL().getPath();
String[] parts = path.split("/");
if (parts.length > 2) {
m.setPath(parts[parts.length - 3] + "/" + parts[parts.length - 2]);
resources.add(m.toResource());
}
}
}
} catch (Throwable e) {
log.warn("load ui resource error", e);
}
return resources;
}
@Override
public Flux<Resource> getResources(Collection<String> id) {
return Flux.empty();
}
@Getter
@Setter
public static class Module {
private String id;
private String name;
private String description;
private String path;
public SimpleResource toResource() {
id = StringUtils.isBlank(id) ? name : id;
return SimpleResource.of(id, TYPE, ObjectMappers.toJsonString(this));
}
}
}

View File

@ -9,6 +9,7 @@ import org.hswebframework.web.authorization.exception.UnAuthorizedException;
import org.jetlinks.community.resource.Resource;
import org.jetlinks.community.resource.ResourceManager;
import org.jetlinks.community.resource.TypeScriptDeclareResourceProvider;
import org.jetlinks.community.resource.ui.UiResourceProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
@ -17,6 +18,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.HashMap;
@RestController
@RequestMapping("/system/resources")
@ -26,6 +28,15 @@ public class SystemResourcesController {
private final ResourceManager resourceManager;
@GetMapping("/ui")
@SneakyThrows
@Authorize(merge = false)
public Flux<Object> getUIResources() {
return resourceManager
.getResources(UiResourceProvider.TYPE)
.map(resource->resource.as(HashMap.class));
}
@GetMapping("/{type}")
@SneakyThrows
public Flux<String> getResources(@PathVariable String type) {

View File

@ -1 +1,2 @@
org.jetlinks.community.configuration.CommonConfiguration
org.jetlinks.community.configuration.UiResourceConfiguration

View File

@ -110,12 +110,12 @@ public class MqttClientProvider implements NetworkProvider<MqttClientProperties>
return new DefaultConfigMetadata()
.add("id", "id", "", new StringType())
.add("remoteHost", "远程地址", "", new StringType())
.add("remotePort", "远程地址", "", new IntType())
.add("certId", "证书id", "", new StringType())
.add("remotePort", "远程端口", "", new IntType())
.add("certId", "证书ID", "", new StringType())
.add("secure", "开启TSL", "", new BooleanType())
.add("clientId", "客户端ID", "", new BooleanType())
.add("username", "用户名", "", new BooleanType())
.add("password", "密码", "", new BooleanType());
.add("clientId", "客户端ID", "", new StringType())
.add("username", "用户名", "", new StringType())
.add("password", "密码", "", new StringType());
}
@Nonnull

View File

@ -13,12 +13,12 @@ import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.community.network.mqtt.server.MqttPublishing;
import org.jetlinks.community.network.mqtt.server.MqttServer;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.community.utils.SystemUtils;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.device.*;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.server.DeviceGatewayContext;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.KeepOnlineSession;
import org.jetlinks.core.trace.FluxTracer;
@ -28,7 +28,6 @@ import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
@ -140,11 +139,11 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
//处理连接并进行认证
private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection connection) {
return Mono
.justOrEmpty(connection.getAuth())
.flatMap(auth -> {
MqttAuthenticationRequest request = new MqttAuthenticationRequest(connection.getClientId(),
MqttAuthenticationRequest request = new MqttAuthenticationRequest(
connection.getClientId(),
auth.getUsername(),
auth.getPassword(),
getTransport());
@ -161,7 +160,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
})
.flatMap(resp -> {
//认证响应可以自定义设备ID,如果没有则使用mqtt的clientId
String deviceId = StringUtils.isEmpty(resp.getDeviceId()) ? connection.getClientId() : resp.getDeviceId();
String deviceId = StringUtils.hasText(resp.getDeviceId()) ? resp.getDeviceId() : connection.getClientId();
//认证返回了新的设备ID,则使用新的设备
return registry
.getDevice(deviceId)
@ -171,7 +170,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
;
})
.as(MonoTracer
.create(SpanName.auth(connection.getClientId()),
.create(SpanName.auth0(connection.getClientId()),
(span, tp3) -> {
AuthenticationResponse response = tp3.getT2();
if (!response.isSuccess()) {
@ -201,7 +200,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
}
//处理认证结果
private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> handleAuthResponse(DeviceOperator device,
private Mono<Tuple3<MqttConnection, DeviceOperator, DeviceSession>> handleAuthResponse(DeviceOperator device,
AuthenticationResponse resp,
MqttConnection connection) {
return Mono
@ -209,30 +208,6 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
String deviceId = device.getDeviceId();
//认证通过
if (resp.isSuccess()) {
//监听断开连接
connection.onClose(conn -> {
counter.decrement();
//监控信息
monitor.disconnected();
monitor.totalConnection(counter.sum());
sessionManager
.getSession(deviceId, false)
.flatMap(_tmp -> {
//只有与创建的会话相同才移除(下线),因为有可能设置了keepOnline,
//或者设备通过其他方式注册了会话,这里断开连接不能影响到以上情况.
if (_tmp != null && _tmp.isWrapFrom(MqttConnectionSession.class) && !(_tmp instanceof KeepOnlineSession)) {
MqttConnectionSession connectionSession = _tmp.unwrap(MqttConnectionSession.class);
if (connectionSession.getConnection() == conn) {
return sessionManager.remove(deviceId, true);
}
}
return Mono.empty();
})
.subscribe();
});
counter.increment();
return sessionManager
.compute(deviceId, old -> {
MqttConnectionSession newSession = new MqttConnectionSession(deviceId, device, getTransport(), connection, monitor);
@ -246,68 +221,100 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
})
.defaultIfEmpty(newSession);
})
.mapNotNull(session->{
.mapNotNull(session -> {
try {
return Tuples.of(connection.accept(), device, session.unwrap(MqttConnectionSession.class));
return Tuples.of(connection.accept(), device, session);
} catch (IllegalStateException ignore) {
//忽略错误,偶尔可能会出现网络异常,导致accept时,连接已经中断.还有其他更好的处理方式?
return null;
}
})
.doOnNext(o -> {
//监控信息
monitor.connected();
monitor.totalConnection(counter.sum());
})
//会话empty说明注册会话失败?
.switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)));
} else {
//认证失败返回 0x04 BAD_USER_NAME_OR_PASSWORD
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
monitor.rejected();
log.warn("MQTT客户端认证[{}]失败:{}", deviceId, resp.getMessage());
}
return Mono.empty();
})
.onErrorResume(error -> Mono.fromRunnable(() -> {
log.error(error.getMessage(), error);
monitor.rejected();
//发生错误时应答 SERVER_UNAVAILABLE
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}))
;
}
protected Mono<Void> handleClientConnect(MqttConnection connection,
DeviceOperator operator) {
return operator
.getProtocol()
.flatMap(supportMono -> supportMono
.onClientConnect(
DefaultTransport.MQTT,
connection,
new DeviceGatewayContext() {
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
@Override
public Mono<DeviceProductOperator> getProduct(String productId) {
return registry.getProduct(productId);
}
@Override
public Mono<Void> onMessage(DeviceMessage message) {
return handleMessage(operator, message, connection)
.then();
}
}
));
}
//处理已经建立连接的MQTT连接
private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection,
DeviceOperator operator,
MqttConnectionSession session) {
return Flux
.usingWhen(Mono.just(connection),
DeviceSession session) {
return this
.handleClientConnect(connection, operator)
.thenMany(Flux.usingWhen(Mono.just(connection),
MqttConnection::handleMessage,
MqttConnection::close)
//网关暂停或者已停止时,则不处理消息
.filter(pb -> isStarted())
.publishOn(Schedulers.parallel())
MqttConnection::close))
//解码收到的mqtt报文
.concatMap(publishing -> this
.concatMap(
publishing -> {
if (!isStarted()) {
return Mono.empty();
}
return this
.decodeAndHandleMessage(operator, session, publishing, connection)
.as(MonoTracer
.create(SpanName.upstream(connection.getClientId()),
(span) -> span.setAttribute(SpanKey.message, publishing.print())))
.create(SpanName.upstream0(connection.getClientId()),
(span) -> span.setAttributeLazy(SpanKey.message, publishing::print)));
},
0
)
.as(flux -> {
MqttMessage will = connection.getWillMessage().orElse(null);
if (will != null) {
//合并遗言消息
.mergeWith(
Mono.justOrEmpty(connection.getWillMessage())
//解码遗言消息
.flatMap(mqttMessage -> this.decodeAndHandleMessage(operator, session, mqttMessage, connection))
)
return flux.mergeWith(
this.decodeAndHandleMessage(operator, session, will, connection)
);
}
return flux;
})
.then();
}
//解码消息并处理
private Mono<Void> decodeAndHandleMessage(DeviceOperator operator,
MqttConnectionSession session,
DeviceSession session,
MqttMessage message,
MqttConnection connection) {
monitor.receivedMessage();

View File

@ -14,6 +14,7 @@ import org.hswebframework.web.exception.BusinessException;
import org.jetlinks.community.tdengine.TDEngineUtils;
import org.jetlinks.community.tdengine.term.TDengineQueryConditionBuilder;
import org.jetlinks.community.things.data.ThingsDataConstants;
import org.jetlinks.community.things.utils.ThingsDatabaseUtils;
import org.jetlinks.core.metadata.Converter;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.community.Interval;
@ -100,11 +101,7 @@ class TDengineThingDataHelper implements Disposable {
.getColumn(metric, term.getColumn())
.ifPresent(meta -> {
DataType type = meta.getValueType();
if (isArrayTerm(type, term)) {
term.setValue(tryConvertList(type, term));
} else if (type instanceof Converter) {
term.setValue(((Converter<?>) type).convert(term.getValue()));
}
ThingsDatabaseUtils.tryConvertTermValue(type, term);
});
}

View File

@ -10,6 +10,8 @@ import org.hswebframework.ezorm.rdb.codec.NumberValueCodec;
import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
import org.jetlinks.community.ConfigMetadataConstants;
import org.jetlinks.community.utils.ConverterUtils;
import org.jetlinks.community.utils.TimeUtils;
import org.jetlinks.core.metadata.Converter;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.*;
@ -208,6 +210,33 @@ public class ThingsDatabaseUtils {
}
}
public static void tryConvertTermValue(DataType type,
Term term,
BiFunction<DataType, Object, Object> tryConvertTermValue) {
tryConvertTermValue(type,
term,
ThingsDatabaseUtils::isDoNotConvertValue,
ThingsDatabaseUtils::maybeList,
tryConvertTermValue);
}
public static void tryConvertTermValue(DataType type,
Term term) {
tryConvertTermValue(type,
term,
ThingsDatabaseUtils::tryConvertTermValue);
}
public static Object tryConvertTermValue(DataType type, Object value) {
if (type instanceof DateTimeType) {
return TimeUtils.convertToDate(value).getTime();
} else if (type instanceof Converter) {
return ((Converter<?>) type).convert(value);
}
return value;
}
public static void tryConvertTermValue(DataType type,
Term term,
BiPredicate<DataType, Term> isDoNotConvertValue,

View File

@ -127,24 +127,27 @@ public class DeviceTagEntity extends GenericEntity<String> {
return tag;
}
public DeviceProperty toProperty() {
DeviceProperty property = new DeviceProperty();
property.setProperty(getKey());
property.setDeviceId(deviceId);
property.setType(type);
property.setPropertyName(name);
property.setValue(parseValue());
return property;
}
public Object parseValue() {
DataType type = Optional
.ofNullable(DataTypes.lookup(getType()))
.map(Supplier::get)
.orElseGet(UnknownType::new);
if (type instanceof Converter) {
property.setValue(((Converter<?>) type).convert(getValue()));
return ((Converter<?>) type).convert(getValue());
} else {
property.setValue(getValue());
return getValue();
}
return property;
}
//以物模型标签基础数据为准重构数据库保存的可能已过时的标签数据

View File

@ -0,0 +1,18 @@
package org.jetlinks.community.device.service.tag;
import org.jetlinks.community.buffer.BufferProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix = "device.tag.synchronizer")
public class DeviceTagProperties extends BufferProperties {
public DeviceTagProperties(){
setFilePath("./data/device-tag-synchronizer");
setSize(500);
setParallelism(1);
getEviction().setMaxSize(100_0000);
}
}

View File

@ -0,0 +1,224 @@
package org.jetlinks.community.device.service.tag;
import lombok.*;
import org.apache.commons.collections4.MapUtils;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.web.crud.events.EntityCreatedEvent;
import org.hswebframework.web.crud.events.EntityDeletedEvent;
import org.hswebframework.web.crud.events.EntityModifyEvent;
import org.hswebframework.web.crud.events.EntitySavedEvent;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.DeviceThingType;
import org.jetlinks.core.message.UpdateTagMessage;
import org.jetlinks.core.things.ThingsDataManager;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.community.buffer.BufferSettings;
import org.jetlinks.community.buffer.PersistenceBuffer;
import org.jetlinks.community.device.entity.DeviceTagEntity;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.things.data.ThingsDataWriter;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.Externalizable;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Date;
import java.util.List;
import java.util.Map;
@Component
@RequiredArgsConstructor
public class DeviceTagSynchronizer implements CommandLineRunner {
private final DeviceTagProperties properties;
private final DeviceRegistry registry;
private final ThingsDataWriter dataWriter;
private final ThingsDataManager dataManager;
private final ReactiveRepository<DeviceTagEntity, String> tagRepository;
public PersistenceBuffer<DeviceTagBuffer> buffer;
@Subscribe(value = "/device/*/*/message/tags/update")
public Mono<Void> updateDeviceTag(UpdateTagMessage message) {
Map<String, Object> tags = message.getTags();
if (MapUtils.isEmpty(tags)) {
return Mono.empty();
}
String deviceId = message.getDeviceId();
return registry
.getDevice(deviceId)
.flatMap(DeviceOperator::getMetadata)
.flatMapMany(metadata -> Flux
.fromIterable(tags.entrySet())
.filter(e -> e.getValue() != null)
.flatMap(e -> {
DeviceTagEntity tagEntity = metadata
.getTag(e.getKey())
.map(tagMeta -> DeviceTagEntity.of(tagMeta, e.getValue()))
.orElseGet(() -> {
DeviceTagEntity entity = new DeviceTagEntity();
entity.setKey(e.getKey());
entity.setType("string");
entity.setName(e.getKey());
entity.setCreateTime(new Date());
entity.setDescription("设备上报");
entity.setValue(String.valueOf(e.getValue()));
return entity;
});
tagEntity.setTimestamp(message.getTimestamp());
tagEntity.setDeviceId(deviceId);
tagEntity.setId(DeviceTagEntity.createTagId(deviceId, tagEntity.getKey()));
return dataWriter
.updateTag(DeviceThingType.device.getId(),
tagEntity.getDeviceId(),
tagEntity.getKey(),
System.currentTimeMillis(),
e.getValue())
.then(writeBuffer(tagEntity));
}))
.then();
}
public Mono<Void> writeBuffer(DeviceTagEntity entity) {
return buffer.writeAsync(new DeviceTagBuffer(entity));
}
private Mono<DeviceTagEntity> convertEntity(DeviceTagBuffer buffer) {
//从最新缓存中获取最新的数据,并填入准备入库的实体中
return dataManager
.getLastTag(DeviceThingType.device.getId(),
buffer.getTag().getDeviceId(),
buffer.getTag().getKey(),
System.currentTimeMillis())
.map(tag -> {
//缓存中的数据比buffer中的新,则更新为buffer中的数据
if (tag.getTimestamp() >= buffer.tag.getTimestamp()) {
buffer.getTag().setTimestamp(tag.getTimestamp());
buffer.getTag().setValue(String.valueOf(tag.getValue()));
}
return buffer.getTag();
})
.defaultIfEmpty(buffer.tag);
}
public Mono<Boolean> handleBuffer(Flux<DeviceTagBuffer> buffer) {
return tagRepository
.save(buffer.flatMap(this::convertEntity))
.contextWrite(ctx -> ctx.put(DeviceTagSynchronizer.class, this))
.then(Reactors.ALWAYS_FALSE);
}
@EventListener
public void handleDeviceTagEvent(EntityCreatedEvent<DeviceTagEntity> event) {
event.async(updateTag(event.getEntity()));
}
@EventListener
public void handleDeviceTagEvent(EntitySavedEvent<DeviceTagEntity> event) {
event.async(updateTag(event.getEntity()));
}
@EventListener
public void handleDeviceTagEvent(EntityModifyEvent<DeviceTagEntity> event) {
event.async(updateTag(event.getAfter()));
}
@EventListener
public void handleDeviceTagEvent(EntityDeletedEvent<DeviceTagEntity> event) {
event.async(
Flux
.fromIterable(event.getEntity())
.flatMap(entity -> dataWriter
.removeTag(DeviceThingType.device.getId(),
entity.getDeviceId(),
entity.getKey())
.then()
));
}
/**
* 更新标签,界面上手动修改标签?
*
* @param entityList 标签
* @return Void
*/
private Mono<Void> updateTag(List<DeviceTagEntity> entityList) {
return Mono.deferContextual(ctx -> {
//更新来自消息的标签,不需要再次更新
if (ctx.hasKey(DeviceTagSynchronizer.class)) {
return Mono.empty();
}
return Flux
.fromIterable(entityList)
.flatMap(entity -> dataWriter
.updateTag(DeviceThingType.device.getId(),
entity.getDeviceId(),
entity.getKey(),
System.currentTimeMillis(),
entity.parseValue()))
.then();
});
}
@PostConstruct
public void init() {
buffer = new PersistenceBuffer<>(
BufferSettings.create(properties),
DeviceTagBuffer::new,
this::handleBuffer)
.name("device-tag-synchronizer");
buffer.init();
}
@PreDestroy
public void shutdown() {
buffer.stop();
}
@Override
public void run(String... args) throws Exception {
buffer.start();
SpringApplication
.getShutdownHandlers()
.add(buffer::dispose);
}
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public static class DeviceTagBuffer implements Externalizable {
private DeviceTagEntity tag;
@Override
public void writeExternal(ObjectOutput out) {
tag.writeExternal(out);
}
@Override
public void readExternal(ObjectInput in) {
tag = new DeviceTagEntity();
tag.readExternal(in);
}
}
}

View File

@ -2,6 +2,7 @@ package org.jetlinks.community.rule.engine.alarm;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.i18n.LocaleUtils;
@ -471,6 +472,8 @@ public class DefaultAlarmHandler implements AlarmHandler {
}
}
@Getter
@Setter
public static class TriggerCache implements Externalizable {
static final byte stateNormal = 0x01;
@ -530,6 +533,8 @@ public class DefaultAlarmHandler implements AlarmHandler {
}
}
@Getter
@Setter
public static class RelieveCache implements Externalizable {
private long reliveTime;

View File

@ -434,6 +434,8 @@ public class DefaultAlarmRuleHandler implements AlarmRuleHandler, CommandLineRun
}
@Deprecated
@Getter
@Setter
public static class RecordCache implements Externalizable {
static final byte stateNormal = 0x01;

View File

@ -26,18 +26,18 @@ public class AlarmRecordMeasurementProvider extends StaticMeasurementProvider {
TimeSeriesManager timeSeriesManager) {
super(AlarmDashboardDefinition.alarm, AlarmObjectDefinition.record);
registry = registryManager.getMeterRegister(AlarmTimeSeriesMetric.alarmStreamMetrics().getId());
// registry = registryManager.getMeterRegister(AlarmTimeSeriesMetric.alarmStreamMetrics().getId());
addMeasurement(new AlarmRecordTrendMeasurement(timeSeriesManager));
addMeasurement(new AlarmRecordRankMeasurement(timeSeriesManager));
}
@EventListener
public void aggAlarmRecord(AlarmHistoryInfo info) {
registry
.counter("record-agg", getTags(info))
.increment();
}
// @EventListener
// public void aggAlarmRecord(AlarmHistoryInfo info) {
// registry
// .counter("record-agg", getTags(info))
// .increment();
// }

View File

@ -219,15 +219,32 @@ public class SceneUtils {
}
}
public static void refactorUpperKey(DeviceSelectorSpec deviceSelectorSpec) {
@SuppressWarnings("all")
public static void refactorUpperKey(Object source) {
// 将变量格式改为与查询的别名一致
if (VariableSource.Source.upper.equals(deviceSelectorSpec.getSource())) {
if (source instanceof VariableSource) {
VariableSource variableSource = (VariableSource) source;
if (VariableSource.Source.upper.equals(variableSource.getSource())) {
variableSource.setUpperKey(transferSceneUpperKey(variableSource.getUpperKey()));
}
}
if (source instanceof Map) {
Map<String, Object> map = (Map<String, Object>) source;
VariableSource variableSource = VariableSource.of(source);
// 将变量格式改为与查询的别名一致
if (VariableSource.Source.upper.equals(variableSource.getSource())) {
map.put("upperKey", transferSceneUpperKey(variableSource.getUpperKey()));
}
}
}
public static String transferSceneUpperKey(String upperKey) {
// scene.xx.current -> scene.scene_xx_current
if (deviceSelectorSpec.getUpperKey().startsWith("scene.")) {
String alias = SceneUtils.createColumnAlias("properties", deviceSelectorSpec.getUpperKey(), false);
deviceSelectorSpec.setUpperKey("scene." + alias);
}
if (upperKey.startsWith("scene.")) {
String alias = SceneUtils.createColumnAlias("scene", upperKey, false);
return "scene." + alias;
}
return upperKey;
}
private static boolean isContainThis(String[] arr) {

View File

@ -148,11 +148,13 @@ public class Variable {
}
public void refactorPrefix(Variable main) {
id = SceneUtils.transferSceneUpperKey(id);
if (CollectionUtils.isNotEmpty(children)) {
for (Variable child : children) {
if (!child.getId().startsWith(main.id + ".")) {
child.setId(main.id + "." + child.getId());
}
child.setId(SceneUtils.transferSceneUpperKey(child.getId()));
if (StringUtils.hasText(child.getFullName()) && StringUtils.hasText(main.getFullName())) {
child.setFullName(main.getFullName() + "/" + child.getFullName());

View File

@ -22,15 +22,15 @@
<project.build.jdk>${java.version}</project.build.jdk>
<!-- 基础通用模块依赖,快照版本表示正在持续迭代.发布后将同步到maven中央仓库 -->
<!-- https://github.com/hs-web/hsweb-framework -->
<hsweb.framework.version>4.0.18</hsweb.framework.version>
<hsweb.framework.version>4.0.20-SNAPSHOT</hsweb.framework.version>
<!-- https://github.com/hs-web/hsweb-easy-orm -->
<easyorm.version>4.1.3</easyorm.version>
<easyorm.version>4.1.5-SNAPSHOT</easyorm.version>
<!-- https://github.com/jetlinks/jetlinks -->
<jetlinks.version>1.2.4-SNAPSHOT</jetlinks.version>
<jetlinks.version>1.2.5-SNAPSHOT</jetlinks.version>
<!-- https://github.com/hs-web/reactor-excel -->
<reactor.excel.version>1.0.6</reactor.excel.version>
<!-- https://github.com/jetlinks/reactor-ql -->
<reactor.ql.version>1.0.18</reactor.ql.version>
<reactor.ql.version>1.0.19</reactor.ql.version>
<!-- https://github.com/jetlinks/jetlinks-plugin -->
<jetlinks.plugin.version>1.0.3</jetlinks.plugin.version>
<!-- https://github.com/jetlinks/jetlinks-sdk -->