Compare commits

..

No commits in common. "master" and "2.3.0" have entirely different histories.

19 changed files with 99 additions and 536 deletions

View File

@ -1,7 +1,7 @@
# JetLinks 物联网基础平台
![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/jetlinks/jetlinks-community/maven.yml?branch=master)
![Version](https://img.shields.io/badge/version-2.3-brightgreen)
![Version](https://img.shields.io/badge/version-2.2-brightgreen)
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/e8d527d692c24633aba4f869c1c5d6ad)](https://app.codacy.com/gh/jetlinks/jetlinks-community?utm_source=github.com&utm_medium=referral&utm_content=jetlinks/jetlinks-community&utm_campaign=Badge_Grade_Settings)
[![OSCS Status](https://www.oscs1024.com/platform/badge/jetlinks/jetlinks-community.svg?size=small)](https://www.oscs1024.com/project/jetlinks/jetlinks-community?ref=badge_small)
[![star](https://img.shields.io/github/stars/jetlinks/jetlinks-community?style=social)](https://github.com/jetlinks/jetlinks-community)

View File

@ -1,24 +0,0 @@
package org.jetlinks.community.configuration;
import org.jetlinks.community.resource.ui.UiMenuResourceProvider;
import org.jetlinks.community.resource.ui.UiResourceProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
@AutoConfiguration
@ConditionalOnProperty(prefix = "jetlinks.ui", name = "enabled", havingValue = "true", matchIfMissing = true)
public class UiResourceConfiguration {
@Bean
public UiResourceProvider uiResourceProvider() {
return new UiResourceProvider();
}
@Bean
public UiMenuResourceProvider uiMenuResourceProvider() {
return new UiMenuResourceProvider();
}
}

View File

@ -1,14 +0,0 @@
package org.jetlinks.community.resource.ui;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.resource.ClassPathJsonResourceProvider;
@Slf4j
public class UiMenuResourceProvider extends ClassPathJsonResourceProvider {
public static final String TYPE = "ui-menus";
public UiMenuResourceProvider() {
super(TYPE, "classpath*:/ui/*/baseMenu.json");
}
}

View File

@ -1,83 +0,0 @@
package org.jetlinks.community.resource.ui;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jetlinks.community.resource.Resource;
import org.jetlinks.community.resource.ResourceProvider;
import org.jetlinks.community.resource.SimpleResource;
import org.jetlinks.community.utils.ObjectMappers;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.util.StreamUtils;
import reactor.core.publisher.Flux;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@Slf4j
public class UiResourceProvider implements ResourceProvider {
public static final String TYPE = "ui";
private static final ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
private List<Resource> cache;
@Override
public String getType() {
return TYPE;
}
@Override
public Flux<Resource> getResources() {
return Flux.fromIterable(cache == null ? cache = read() : cache);
}
@SneakyThrows
private List<Resource> read() {
List<Resource> resources = new ArrayList<>();
try {
for (org.springframework.core.io.Resource resource : resolver.getResources("classpath*:/ui/*/package.json")) {
try (InputStream stream = resource.getInputStream()) {
String s = StreamUtils.copyToString(stream, StandardCharsets.UTF_8);
Module m = ObjectMappers.parseJson(s, Module.class);
String path = resource.getURL().getPath();
String[] parts = path.split("/");
if (parts.length > 2) {
m.setPath(parts[parts.length - 3] + "/" + parts[parts.length - 2]);
resources.add(m.toResource());
}
}
}
} catch (Throwable e) {
log.warn("load ui resource error", e);
}
return resources;
}
@Override
public Flux<Resource> getResources(Collection<String> id) {
return Flux.empty();
}
@Getter
@Setter
public static class Module {
private String id;
private String name;
private String description;
private String path;
public SimpleResource toResource() {
id = StringUtils.isBlank(id) ? name : id;
return SimpleResource.of(id, TYPE, ObjectMappers.toJsonString(this));
}
}
}

View File

@ -9,7 +9,6 @@ import org.hswebframework.web.authorization.exception.UnAuthorizedException;
import org.jetlinks.community.resource.Resource;
import org.jetlinks.community.resource.ResourceManager;
import org.jetlinks.community.resource.TypeScriptDeclareResourceProvider;
import org.jetlinks.community.resource.ui.UiResourceProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
@ -18,7 +17,6 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.HashMap;
@RestController
@RequestMapping("/system/resources")
@ -28,15 +26,6 @@ public class SystemResourcesController {
private final ResourceManager resourceManager;
@GetMapping("/ui")
@SneakyThrows
@Authorize(merge = false)
public Flux<Object> getUIResources() {
return resourceManager
.getResources(UiResourceProvider.TYPE)
.map(resource->resource.as(HashMap.class));
}
@GetMapping("/{type}")
@SneakyThrows
public Flux<String> getResources(@PathVariable String type) {

View File

@ -1,2 +1 @@
org.jetlinks.community.configuration.CommonConfiguration
org.jetlinks.community.configuration.UiResourceConfiguration
org.jetlinks.community.configuration.CommonConfiguration

View File

@ -110,12 +110,12 @@ public class MqttClientProvider implements NetworkProvider<MqttClientProperties>
return new DefaultConfigMetadata()
.add("id", "id", "", new StringType())
.add("remoteHost", "远程地址", "", new StringType())
.add("remotePort", "远程端口", "", new IntType())
.add("certId", "证书ID", "", new StringType())
.add("remotePort", "远程地址", "", new IntType())
.add("certId", "证书id", "", new StringType())
.add("secure", "开启TSL", "", new BooleanType())
.add("clientId", "客户端ID", "", new StringType())
.add("username", "用户名", "", new StringType())
.add("password", "密码", "", new StringType());
.add("clientId", "客户端ID", "", new BooleanType())
.add("username", "用户名", "", new BooleanType())
.add("password", "密码", "", new BooleanType());
}
@Nonnull

View File

@ -13,12 +13,12 @@ import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.community.network.mqtt.server.MqttPublishing;
import org.jetlinks.community.network.mqtt.server.MqttServer;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.community.utils.SystemUtils;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.device.*;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.server.DeviceGatewayContext;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.KeepOnlineSession;
import org.jetlinks.core.trace.FluxTracer;
@ -28,6 +28,7 @@ import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
@ -139,14 +140,14 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
//处理连接并进行认证
private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection connection) {
return Mono
.justOrEmpty(connection.getAuth())
.flatMap(auth -> {
MqttAuthenticationRequest request = new MqttAuthenticationRequest(
connection.getClientId(),
auth.getUsername(),
auth.getPassword(),
getTransport());
MqttAuthenticationRequest request = new MqttAuthenticationRequest(connection.getClientId(),
auth.getUsername(),
auth.getPassword(),
getTransport());
return supportMono
//使用自定义协议来认证
.map(support -> support.authenticate(request, registry))
@ -160,7 +161,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
})
.flatMap(resp -> {
//认证响应可以自定义设备ID,如果没有则使用mqtt的clientId
String deviceId = StringUtils.hasText(resp.getDeviceId()) ? resp.getDeviceId() : connection.getClientId();
String deviceId = StringUtils.isEmpty(resp.getDeviceId()) ? connection.getClientId() : resp.getDeviceId();
//认证返回了新的设备ID,则使用新的设备
return registry
.getDevice(deviceId)
@ -170,7 +171,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
;
})
.as(MonoTracer
.create(SpanName.auth0(connection.getClientId()),
.create(SpanName.auth(connection.getClientId()),
(span, tp3) -> {
AuthenticationResponse response = tp3.getT2();
if (!response.isSuccess()) {
@ -200,14 +201,38 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
}
//处理认证结果
private Mono<Tuple3<MqttConnection, DeviceOperator, DeviceSession>> handleAuthResponse(DeviceOperator device,
AuthenticationResponse resp,
MqttConnection connection) {
private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> handleAuthResponse(DeviceOperator device,
AuthenticationResponse resp,
MqttConnection connection) {
return Mono
.defer(() -> {
String deviceId = device.getDeviceId();
//认证通过
if (resp.isSuccess()) {
//监听断开连接
connection.onClose(conn -> {
counter.decrement();
//监控信息
monitor.disconnected();
monitor.totalConnection(counter.sum());
sessionManager
.getSession(deviceId, false)
.flatMap(_tmp -> {
//只有与创建的会话相同才移除(下线),因为有可能设置了keepOnline,
//或者设备通过其他方式注册了会话,这里断开连接不能影响到以上情况.
if (_tmp != null && _tmp.isWrapFrom(MqttConnectionSession.class) && !(_tmp instanceof KeepOnlineSession)) {
MqttConnectionSession connectionSession = _tmp.unwrap(MqttConnectionSession.class);
if (connectionSession.getConnection() == conn) {
return sessionManager.remove(deviceId, true);
}
}
return Mono.empty();
})
.subscribe();
});
counter.increment();
return sessionManager
.compute(deviceId, old -> {
MqttConnectionSession newSession = new MqttConnectionSession(deviceId, device, getTransport(), connection, monitor);
@ -221,100 +246,68 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway {
})
.defaultIfEmpty(newSession);
})
.mapNotNull(session -> {
.mapNotNull(session->{
try {
return Tuples.of(connection.accept(), device, session);
return Tuples.of(connection.accept(), device, session.unwrap(MqttConnectionSession.class));
} catch (IllegalStateException ignore) {
//忽略错误,偶尔可能会出现网络异常,导致accept时,连接已经中断.还有其他更好的处理方式?
return null;
}
})
.doOnNext(o -> {
//监控信息
monitor.connected();
monitor.totalConnection(counter.sum());
})
//会话empty说明注册会话失败?
.switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)));
} else {
//认证失败返回 0x04 BAD_USER_NAME_OR_PASSWORD
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
monitor.rejected();
log.warn("MQTT客户端认证[{}]失败:{}", deviceId, resp.getMessage());
}
return Mono.empty();
})
.onErrorResume(error -> Mono.fromRunnable(() -> {
log.error(error.getMessage(), error);
monitor.rejected();
//发生错误时应答 SERVER_UNAVAILABLE
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}))
;
}
protected Mono<Void> handleClientConnect(MqttConnection connection,
DeviceOperator operator) {
return operator
.getProtocol()
.flatMap(supportMono -> supportMono
.onClientConnect(
DefaultTransport.MQTT,
connection,
new DeviceGatewayContext() {
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
@Override
public Mono<DeviceProductOperator> getProduct(String productId) {
return registry.getProduct(productId);
}
@Override
public Mono<Void> onMessage(DeviceMessage message) {
return handleMessage(operator, message, connection)
.then();
}
}
));
}
//处理已经建立连接的MQTT连接
private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection,
DeviceOperator operator,
DeviceSession session) {
return this
.handleClientConnect(connection, operator)
.thenMany(Flux.usingWhen(Mono.just(connection),
MqttConnection::handleMessage,
MqttConnection::close))
MqttConnectionSession session) {
return Flux
.usingWhen(Mono.just(connection),
MqttConnection::handleMessage,
MqttConnection::close)
//网关暂停或者已停止时,则不处理消息
.filter(pb -> isStarted())
.publishOn(Schedulers.parallel())
//解码收到的mqtt报文
.concatMap(
publishing -> {
if (!isStarted()) {
return Mono.empty();
}
return this
.decodeAndHandleMessage(operator, session, publishing, connection)
.as(MonoTracer
.create(SpanName.upstream0(connection.getClientId()),
(span) -> span.setAttributeLazy(SpanKey.message, publishing::print)));
},
0
.concatMap(publishing -> this
.decodeAndHandleMessage(operator, session, publishing, connection)
.as(MonoTracer
.create(SpanName.upstream(connection.getClientId()),
(span) -> span.setAttribute(SpanKey.message, publishing.print())))
)
//合并遗言消息
.mergeWith(
Mono.justOrEmpty(connection.getWillMessage())
//解码遗言消息
.flatMap(mqttMessage -> this.decodeAndHandleMessage(operator, session, mqttMessage, connection))
)
.as(flux -> {
MqttMessage will = connection.getWillMessage().orElse(null);
if (will != null) {
//合并遗言消息
return flux.mergeWith(
this.decodeAndHandleMessage(operator, session, will, connection)
);
}
return flux;
})
.then();
}
//解码消息并处理
private Mono<Void> decodeAndHandleMessage(DeviceOperator operator,
DeviceSession session,
MqttConnectionSession session,
MqttMessage message,
MqttConnection connection) {
monitor.receivedMessage();

View File

@ -14,7 +14,6 @@ import org.hswebframework.web.exception.BusinessException;
import org.jetlinks.community.tdengine.TDEngineUtils;
import org.jetlinks.community.tdengine.term.TDengineQueryConditionBuilder;
import org.jetlinks.community.things.data.ThingsDataConstants;
import org.jetlinks.community.things.utils.ThingsDatabaseUtils;
import org.jetlinks.core.metadata.Converter;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.community.Interval;
@ -101,7 +100,11 @@ class TDengineThingDataHelper implements Disposable {
.getColumn(metric, term.getColumn())
.ifPresent(meta -> {
DataType type = meta.getValueType();
ThingsDatabaseUtils.tryConvertTermValue(type, term);
if (isArrayTerm(type, term)) {
term.setValue(tryConvertList(type, term));
} else if (type instanceof Converter) {
term.setValue(((Converter<?>) type).convert(term.getValue()));
}
});
}

View File

@ -10,8 +10,6 @@ import org.hswebframework.ezorm.rdb.codec.NumberValueCodec;
import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
import org.jetlinks.community.ConfigMetadataConstants;
import org.jetlinks.community.utils.ConverterUtils;
import org.jetlinks.community.utils.TimeUtils;
import org.jetlinks.core.metadata.Converter;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.*;
@ -210,33 +208,6 @@ public class ThingsDatabaseUtils {
}
}
public static void tryConvertTermValue(DataType type,
Term term,
BiFunction<DataType, Object, Object> tryConvertTermValue) {
tryConvertTermValue(type,
term,
ThingsDatabaseUtils::isDoNotConvertValue,
ThingsDatabaseUtils::maybeList,
tryConvertTermValue);
}
public static void tryConvertTermValue(DataType type,
Term term) {
tryConvertTermValue(type,
term,
ThingsDatabaseUtils::tryConvertTermValue);
}
public static Object tryConvertTermValue(DataType type, Object value) {
if (type instanceof DateTimeType) {
return TimeUtils.convertToDate(value).getTime();
} else if (type instanceof Converter) {
return ((Converter<?>) type).convert(value);
}
return value;
}
public static void tryConvertTermValue(DataType type,
Term term,
BiPredicate<DataType, Term> isDoNotConvertValue,

View File

@ -127,27 +127,24 @@ public class DeviceTagEntity extends GenericEntity<String> {
return tag;
}
public DeviceProperty toProperty() {
DeviceProperty property = new DeviceProperty();
property.setProperty(getKey());
property.setDeviceId(deviceId);
property.setType(type);
property.setPropertyName(name);
property.setValue(parseValue());
return property;
}
public Object parseValue() {
DataType type = Optional
.ofNullable(DataTypes.lookup(getType()))
.map(Supplier::get)
.orElseGet(UnknownType::new);
if (type instanceof Converter) {
return ((Converter<?>) type).convert(getValue());
property.setValue(((Converter<?>) type).convert(getValue()));
} else {
return getValue();
property.setValue(getValue());
}
return property;
}
//以物模型标签基础数据为准重构数据库保存的可能已过时的标签数据

View File

@ -1,18 +0,0 @@
package org.jetlinks.community.device.service.tag;
import org.jetlinks.community.buffer.BufferProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix = "device.tag.synchronizer")
public class DeviceTagProperties extends BufferProperties {
public DeviceTagProperties(){
setFilePath("./data/device-tag-synchronizer");
setSize(500);
setParallelism(1);
getEviction().setMaxSize(100_0000);
}
}

View File

@ -1,224 +0,0 @@
package org.jetlinks.community.device.service.tag;
import lombok.*;
import org.apache.commons.collections4.MapUtils;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.web.crud.events.EntityCreatedEvent;
import org.hswebframework.web.crud.events.EntityDeletedEvent;
import org.hswebframework.web.crud.events.EntityModifyEvent;
import org.hswebframework.web.crud.events.EntitySavedEvent;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.DeviceThingType;
import org.jetlinks.core.message.UpdateTagMessage;
import org.jetlinks.core.things.ThingsDataManager;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.community.buffer.BufferSettings;
import org.jetlinks.community.buffer.PersistenceBuffer;
import org.jetlinks.community.device.entity.DeviceTagEntity;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.things.data.ThingsDataWriter;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.Externalizable;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Date;
import java.util.List;
import java.util.Map;
@Component
@RequiredArgsConstructor
public class DeviceTagSynchronizer implements CommandLineRunner {
private final DeviceTagProperties properties;
private final DeviceRegistry registry;
private final ThingsDataWriter dataWriter;
private final ThingsDataManager dataManager;
private final ReactiveRepository<DeviceTagEntity, String> tagRepository;
public PersistenceBuffer<DeviceTagBuffer> buffer;
@Subscribe(value = "/device/*/*/message/tags/update")
public Mono<Void> updateDeviceTag(UpdateTagMessage message) {
Map<String, Object> tags = message.getTags();
if (MapUtils.isEmpty(tags)) {
return Mono.empty();
}
String deviceId = message.getDeviceId();
return registry
.getDevice(deviceId)
.flatMap(DeviceOperator::getMetadata)
.flatMapMany(metadata -> Flux
.fromIterable(tags.entrySet())
.filter(e -> e.getValue() != null)
.flatMap(e -> {
DeviceTagEntity tagEntity = metadata
.getTag(e.getKey())
.map(tagMeta -> DeviceTagEntity.of(tagMeta, e.getValue()))
.orElseGet(() -> {
DeviceTagEntity entity = new DeviceTagEntity();
entity.setKey(e.getKey());
entity.setType("string");
entity.setName(e.getKey());
entity.setCreateTime(new Date());
entity.setDescription("设备上报");
entity.setValue(String.valueOf(e.getValue()));
return entity;
});
tagEntity.setTimestamp(message.getTimestamp());
tagEntity.setDeviceId(deviceId);
tagEntity.setId(DeviceTagEntity.createTagId(deviceId, tagEntity.getKey()));
return dataWriter
.updateTag(DeviceThingType.device.getId(),
tagEntity.getDeviceId(),
tagEntity.getKey(),
System.currentTimeMillis(),
e.getValue())
.then(writeBuffer(tagEntity));
}))
.then();
}
public Mono<Void> writeBuffer(DeviceTagEntity entity) {
return buffer.writeAsync(new DeviceTagBuffer(entity));
}
private Mono<DeviceTagEntity> convertEntity(DeviceTagBuffer buffer) {
//从最新缓存中获取最新的数据,并填入准备入库的实体中
return dataManager
.getLastTag(DeviceThingType.device.getId(),
buffer.getTag().getDeviceId(),
buffer.getTag().getKey(),
System.currentTimeMillis())
.map(tag -> {
//缓存中的数据比buffer中的新,则更新为buffer中的数据
if (tag.getTimestamp() >= buffer.tag.getTimestamp()) {
buffer.getTag().setTimestamp(tag.getTimestamp());
buffer.getTag().setValue(String.valueOf(tag.getValue()));
}
return buffer.getTag();
})
.defaultIfEmpty(buffer.tag);
}
public Mono<Boolean> handleBuffer(Flux<DeviceTagBuffer> buffer) {
return tagRepository
.save(buffer.flatMap(this::convertEntity))
.contextWrite(ctx -> ctx.put(DeviceTagSynchronizer.class, this))
.then(Reactors.ALWAYS_FALSE);
}
@EventListener
public void handleDeviceTagEvent(EntityCreatedEvent<DeviceTagEntity> event) {
event.async(updateTag(event.getEntity()));
}
@EventListener
public void handleDeviceTagEvent(EntitySavedEvent<DeviceTagEntity> event) {
event.async(updateTag(event.getEntity()));
}
@EventListener
public void handleDeviceTagEvent(EntityModifyEvent<DeviceTagEntity> event) {
event.async(updateTag(event.getAfter()));
}
@EventListener
public void handleDeviceTagEvent(EntityDeletedEvent<DeviceTagEntity> event) {
event.async(
Flux
.fromIterable(event.getEntity())
.flatMap(entity -> dataWriter
.removeTag(DeviceThingType.device.getId(),
entity.getDeviceId(),
entity.getKey())
.then()
));
}
/**
* 更新标签,界面上手动修改标签?
*
* @param entityList 标签
* @return Void
*/
private Mono<Void> updateTag(List<DeviceTagEntity> entityList) {
return Mono.deferContextual(ctx -> {
//更新来自消息的标签,不需要再次更新
if (ctx.hasKey(DeviceTagSynchronizer.class)) {
return Mono.empty();
}
return Flux
.fromIterable(entityList)
.flatMap(entity -> dataWriter
.updateTag(DeviceThingType.device.getId(),
entity.getDeviceId(),
entity.getKey(),
System.currentTimeMillis(),
entity.parseValue()))
.then();
});
}
@PostConstruct
public void init() {
buffer = new PersistenceBuffer<>(
BufferSettings.create(properties),
DeviceTagBuffer::new,
this::handleBuffer)
.name("device-tag-synchronizer");
buffer.init();
}
@PreDestroy
public void shutdown() {
buffer.stop();
}
@Override
public void run(String... args) throws Exception {
buffer.start();
SpringApplication
.getShutdownHandlers()
.add(buffer::dispose);
}
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public static class DeviceTagBuffer implements Externalizable {
private DeviceTagEntity tag;
@Override
public void writeExternal(ObjectOutput out) {
tag.writeExternal(out);
}
@Override
public void readExternal(ObjectInput in) {
tag = new DeviceTagEntity();
tag.readExternal(in);
}
}
}

View File

@ -2,7 +2,6 @@ package org.jetlinks.community.rule.engine.alarm;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.i18n.LocaleUtils;
@ -472,8 +471,6 @@ public class DefaultAlarmHandler implements AlarmHandler {
}
}
@Getter
@Setter
public static class TriggerCache implements Externalizable {
static final byte stateNormal = 0x01;
@ -533,8 +530,6 @@ public class DefaultAlarmHandler implements AlarmHandler {
}
}
@Getter
@Setter
public static class RelieveCache implements Externalizable {
private long reliveTime;

View File

@ -434,8 +434,6 @@ public class DefaultAlarmRuleHandler implements AlarmRuleHandler, CommandLineRun
}
@Deprecated
@Getter
@Setter
public static class RecordCache implements Externalizable {
static final byte stateNormal = 0x01;

View File

@ -26,18 +26,18 @@ public class AlarmRecordMeasurementProvider extends StaticMeasurementProvider {
TimeSeriesManager timeSeriesManager) {
super(AlarmDashboardDefinition.alarm, AlarmObjectDefinition.record);
// registry = registryManager.getMeterRegister(AlarmTimeSeriesMetric.alarmStreamMetrics().getId());
registry = registryManager.getMeterRegister(AlarmTimeSeriesMetric.alarmStreamMetrics().getId());
addMeasurement(new AlarmRecordTrendMeasurement(timeSeriesManager));
addMeasurement(new AlarmRecordRankMeasurement(timeSeriesManager));
}
// @EventListener
// public void aggAlarmRecord(AlarmHistoryInfo info) {
// registry
// .counter("record-agg", getTags(info))
// .increment();
// }
@EventListener
public void aggAlarmRecord(AlarmHistoryInfo info) {
registry
.counter("record-agg", getTags(info))
.increment();
}

View File

@ -219,32 +219,15 @@ public class SceneUtils {
}
}
@SuppressWarnings("all")
public static void refactorUpperKey(Object source) {
public static void refactorUpperKey(DeviceSelectorSpec deviceSelectorSpec) {
// 将变量格式改为与查询的别名一致
if (source instanceof VariableSource) {
VariableSource variableSource = (VariableSource) source;
if (VariableSource.Source.upper.equals(variableSource.getSource())) {
variableSource.setUpperKey(transferSceneUpperKey(variableSource.getUpperKey()));
if (VariableSource.Source.upper.equals(deviceSelectorSpec.getSource())) {
// scene.xx.current -> scene.scene_xx_current
if (deviceSelectorSpec.getUpperKey().startsWith("scene.")) {
String alias = SceneUtils.createColumnAlias("properties", deviceSelectorSpec.getUpperKey(), false);
deviceSelectorSpec.setUpperKey("scene." + alias);
}
}
if (source instanceof Map) {
Map<String, Object> map = (Map<String, Object>) source;
VariableSource variableSource = VariableSource.of(source);
// 将变量格式改为与查询的别名一致
if (VariableSource.Source.upper.equals(variableSource.getSource())) {
map.put("upperKey", transferSceneUpperKey(variableSource.getUpperKey()));
}
}
}
public static String transferSceneUpperKey(String upperKey) {
// scene.xx.current -> scene.scene_xx_current
if (upperKey.startsWith("scene.")) {
String alias = SceneUtils.createColumnAlias("scene", upperKey, false);
return "scene." + alias;
}
return upperKey;
}
private static boolean isContainThis(String[] arr) {

View File

@ -148,13 +148,11 @@ public class Variable {
}
public void refactorPrefix(Variable main) {
id = SceneUtils.transferSceneUpperKey(id);
if (CollectionUtils.isNotEmpty(children)) {
for (Variable child : children) {
if (!child.getId().startsWith(main.id + ".")) {
child.setId(main.id + "." + child.getId());
}
child.setId(SceneUtils.transferSceneUpperKey(child.getId()));
if (StringUtils.hasText(child.getFullName()) && StringUtils.hasText(main.getFullName())) {
child.setFullName(main.getFullName() + "/" + child.getFullName());

View File

@ -22,15 +22,15 @@
<project.build.jdk>${java.version}</project.build.jdk>
<!-- 基础通用模块依赖,快照版本表示正在持续迭代.发布后将同步到maven中央仓库 -->
<!-- https://github.com/hs-web/hsweb-framework -->
<hsweb.framework.version>4.0.20-SNAPSHOT</hsweb.framework.version>
<hsweb.framework.version>4.0.18</hsweb.framework.version>
<!-- https://github.com/hs-web/hsweb-easy-orm -->
<easyorm.version>4.1.5-SNAPSHOT</easyorm.version>
<easyorm.version>4.1.3</easyorm.version>
<!-- https://github.com/jetlinks/jetlinks -->
<jetlinks.version>1.2.5-SNAPSHOT</jetlinks.version>
<jetlinks.version>1.2.4-SNAPSHOT</jetlinks.version>
<!-- https://github.com/hs-web/reactor-excel -->
<reactor.excel.version>1.0.6</reactor.excel.version>
<!-- https://github.com/jetlinks/reactor-ql -->
<reactor.ql.version>1.0.19</reactor.ql.version>
<reactor.ql.version>1.0.18</reactor.ql.version>
<!-- https://github.com/jetlinks/jetlinks-plugin -->
<jetlinks.plugin.version>1.0.3</jetlinks.plugin.version>
<!-- https://github.com/jetlinks/jetlinks-sdk -->