Merge remote-tracking branch 'origin/2.10' into 2.10

This commit is contained in:
zhouhao 2025-06-24 20:38:24 +08:00
commit 084c55bd9a
8 changed files with 146 additions and 145 deletions

View File

@ -28,6 +28,7 @@ import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;
import java.net.InetSocketAddress;
import java.util.Collection;
@ -79,10 +80,10 @@ public class VertxMqttServer implements MqttServer {
if (sink.currentSubscriberCount() <= 0) {
return false;
}
try{
sink.emitNext(connection,Reactors.emitFailureHandler());
return true;
}catch (Throwable ignore){}
try {
return sink.tryEmitNext(connection).isSuccess();
} catch (Throwable ignore) {
}
return false;
}
@ -90,7 +91,7 @@ public class VertxMqttServer implements MqttServer {
boolean anyHandled = emitNext(sink, connection);
for (List<Sinks.Many<MqttConnection>> value : sinks.values()) {
if (value.size() == 0) {
if (value.isEmpty()) {
continue;
}
Sinks.Many<MqttConnection> sink = value.get(ThreadLocalRandom.current().nextInt(value.size()));
@ -114,7 +115,11 @@ public class VertxMqttServer implements MqttServer {
.sinks
.computeIfAbsent(holder, ignore -> new CopyOnWriteArrayList<>());
Sinks.Many<MqttConnection> sink = Reactors.createMany(Integer.MAX_VALUE,true);
Sinks.Many<MqttConnection> sink =
Sinks.unsafe()
.many()
.unicast()
.onBackpressureBuffer(Queues.<MqttConnection>unboundedMultiproducer().get());
sinks.add(sink);

View File

@ -46,28 +46,28 @@ public class DefaultPluginDataIdMapper implements PluginDataIdMapper {
@EventListener
public void handleEvent(EntityCreatedEvent<PluginDataIdMappingEntity> event){
public void handleEvent(EntityCreatedEvent<PluginDataIdMappingEntity> event) {
event.async(
saveMapping(Flux.fromIterable(event.getEntity()))
);
}
@EventListener
public void handleEvent(EntityModifyEvent<PluginDataIdMappingEntity> event){
public void handleEvent(EntityModifyEvent<PluginDataIdMappingEntity> event) {
event.async(
saveMapping(Flux.fromIterable(event.getAfter()))
);
}
@EventListener
public void handleEvent(EntitySavedEvent<PluginDataIdMappingEntity> event){
public void handleEvent(EntitySavedEvent<PluginDataIdMappingEntity> event) {
event.async(
saveMapping(Flux.fromIterable(event.getEntity()))
);
}
@EventListener
public void handleEvent(EntityDeletedEvent<PluginDataIdMappingEntity> event){
public void handleEvent(EntityDeletedEvent<PluginDataIdMappingEntity> event) {
event.async(
removeMapping(Flux.fromIterable(event.getEntity()))
);
@ -120,9 +120,16 @@ public class DefaultPluginDataIdMapper implements PluginDataIdMapper {
public Mono<String> getInternalId(String type,
String pluginId,
String externalId) {
Assert.notNull(externalId,"externalId must not be null");
Assert.notNull(externalId, "externalId must not be null");
return doWithStore(store -> store
.getConfig(createMappingKey(type, pluginId, externalId))
.getConfig(createMappingKey(type, pluginId, externalId),
Mono.defer(() -> repository
.createQuery()
.where(PluginDataIdMappingEntity::getType, type)
.and(PluginDataIdMappingEntity::getPluginId, pluginId)
.and(PluginDataIdMappingEntity::getExternalId, externalId)
.fetchOne()
.map(PluginDataIdMappingEntity::getInternalId)))
.map(Value::asString))
.defaultIfEmpty(externalId);
}
@ -131,9 +138,16 @@ public class DefaultPluginDataIdMapper implements PluginDataIdMapper {
public Mono<String> getExternalId(String type,
String pluginId,
String internalId) {
Assert.notNull(internalId,"internalId must not be null");
Assert.notNull(internalId, "internalId must not be null");
return doWithStore(store -> store
.getConfig(createMappingKey(type, pluginId, internalId))
.getConfig(createMappingKey(type, pluginId, internalId),
Mono.defer(() -> repository
.createQuery()
.where(PluginDataIdMappingEntity::getType, type)
.and(PluginDataIdMappingEntity::getPluginId, pluginId)
.and(PluginDataIdMappingEntity::getInternalId, internalId)
.fetchOne()
.map(PluginDataIdMappingEntity::getExternalId)))
.map(Value::asString))
.defaultIfEmpty(internalId);
}
@ -142,9 +156,9 @@ public class DefaultPluginDataIdMapper implements PluginDataIdMapper {
public Flux<PluginDataMapping> getMappings(String type, String pluginId) {
return repository
.createQuery()
.where(PluginDataIdMappingEntity::getType,type)
.and(PluginDataIdMappingEntity::getPluginId,pluginId)
.where(PluginDataIdMappingEntity::getType, type)
.and(PluginDataIdMappingEntity::getPluginId, pluginId)
.fetch()
.map(entity-> new PluginDataMapping(entity.getExternalId(),entity.getInternalId()));
.map(entity -> new PluginDataMapping(entity.getExternalId(), entity.getInternalId()));
}
}

View File

@ -53,7 +53,7 @@ import java.util.function.Function;
import java.util.regex.Pattern;
@AllArgsConstructor
class TDengineThingDataHelper implements Disposable {
public class TDengineThingDataHelper implements Disposable {
final TDengineOperations operations;

View File

@ -15,6 +15,7 @@
*/
package org.jetlinks.community.timescaledb.thing;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.ezorm.rdb.codec.DateTimeCodec;
import org.hswebframework.ezorm.rdb.metadata.RDBIndexMetadata;
@ -23,7 +24,6 @@ import org.hswebframework.ezorm.rdb.metadata.RDBTableMetadata;
import org.hswebframework.ezorm.rdb.operator.DatabaseOperator;
import org.hswebframework.ezorm.rdb.operator.ddl.TableBuilder;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.community.Interval;
import org.jetlinks.community.things.data.ThingsDataConstants;
import org.jetlinks.community.things.data.operations.ColumnModeDDLOperationsBase;
import org.jetlinks.community.things.data.operations.DataSettings;
@ -44,6 +44,11 @@ public class TimescaleDBColumnModeDDLOperations extends ColumnModeDDLOperationsB
private final TimescaleDBThingsDataProperties properties;
static Set<String> ignoreColumn = Sets.newHashSet(
ThingsDataConstants.COLUMN_ID,
ThingsDataConstants.COLUMN_MESSAGE_ID
);
public TimescaleDBColumnModeDDLOperations(String thingType,
String templateId,
String thingId,
@ -70,6 +75,9 @@ public class TimescaleDBColumnModeDDLOperations extends ColumnModeDDLOperationsB
List<String> partitions = new ArrayList<>();
partitions.add(ThingsDataConstants.COLUMN_THING_ID);
for (PropertyMetadata property : properties) {
if (ignoreColumn.contains(property.getId())) {
continue;
}
builder
.addColumn(property.getId())
.custom(column -> {

View File

@ -15,56 +15,34 @@
*/
package org.jetlinks.community.rule.engine.measurement;
import com.google.common.collect.Maps;
import io.micrometer.core.instrument.MeterRegistry;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.jetlinks.community.micrometer.MeterRegistryManager;
import org.jetlinks.community.rule.engine.alarm.AlarmConstants;
import org.jetlinks.community.rule.engine.entity.AlarmHistoryInfo;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.utils.ConverterUtils;
import org.springframework.context.event.EventListener;
import org.jetlinks.community.rule.engine.service.AlarmHistoryService;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author bestfeng
*/
@Component
public class AlarmRecordMeasurementProvider extends StaticMeasurementProvider {
MeterRegistry registry;
// MeterRegistry registry;
public AlarmRecordMeasurementProvider(MeterRegistryManager registryManager,
TimeSeriesManager timeSeriesManager) {
public AlarmRecordMeasurementProvider(AlarmHistoryService historyService) {
super(AlarmDashboardDefinition.alarm, AlarmObjectDefinition.record);
registry = registryManager.getMeterRegister(AlarmTimeSeriesMetric.alarmStreamMetrics().getId());
addMeasurement(new AlarmRecordTrendMeasurement(timeSeriesManager));
addMeasurement(new AlarmRecordRankMeasurement(timeSeriesManager));
// registry = registryManager.getMeterRegister(AlarmTimeSeriesMetric.alarmStreamMetrics().getId());
addMeasurement(new AlarmRecordTrendMeasurement(historyService));
addMeasurement(new AlarmRecordRankMeasurement(historyService));
}
@EventListener
public void aggAlarmRecord(AlarmHistoryInfo info) {
registry
.counter("record-agg", getTags(info))
.increment();
}
// @EventListener
// public void aggAlarmRecord(AlarmHistoryInfo info) {
// registry
// .counter("record-agg", getTags(info))
// .increment();
// }
public String[] getTags(AlarmHistoryInfo info) {
Map<String, Object> tagMap = Maps.newLinkedHashMap();
// tagMap.put(AlarmConstants.ConfigKey.targetId, info.getTargetId());
//只需要记录targetType,用于统计 设备产品等告警数量.
tagMap.put(AlarmConstants.ConfigKey.targetType, info.getTargetType());
//tagMap.put(AlarmConstants.ConfigKey.targetName, info.getTargetName());
// tagMap.put(AlarmConstants.ConfigKey.alarmConfigId, info.getAlarmConfigId());
tagMap.put(PropertyConstants.creatorId.getKey(), info.getCreatorId());
return ConverterUtils.convertMapToTags(tagMap);
}
}

View File

@ -19,18 +19,17 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.Aggregation;
import org.jetlinks.community.timeseries.query.AggregationData;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.StringType;
import org.springframework.util.StringUtils;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.rule.engine.service.AlarmHistoryService;
import org.jetlinks.community.timeseries.query.Aggregation;
import org.jetlinks.community.timeseries.query.AggregationData;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import reactor.core.publisher.Flux;
import java.time.LocalDateTime;
@ -44,22 +43,22 @@ import java.util.Objects;
*/
public class AlarmRecordRankMeasurement extends StaticMeasurement {
TimeSeriesManager timeSeriesManager;
AlarmHistoryService historyService;
public AlarmRecordRankMeasurement(TimeSeriesManager timeSeriesManager) {
public AlarmRecordRankMeasurement(AlarmHistoryService historyService) {
super(MeasurementDefinition.of("rank", "告警记录排名"));
this.timeSeriesManager = timeSeriesManager;
this.historyService = historyService;
addDimension(new AggRecordRankDimension());
}
static ConfigMetadata aggConfigMetadata = new DefaultConfigMetadata()
.add("time", "周期", "例如: 1h,10m,30s", StringType.GLOBAL)
.add("agg", "聚合类型", "count,sum,avg,max,min", StringType.GLOBAL)
.add("format", "时间格式", "如: MM-dd:HH", StringType.GLOBAL)
.add("limit", "最大数据量", "", StringType.GLOBAL)
.add("from", "时间从", "", StringType.GLOBAL)
.add("to", "时间至", "", StringType.GLOBAL);
.add("time", "周期", "例如: 1h,10m,30s", StringType.GLOBAL)
.add("agg", "聚合类型", "count,sum,avg,max,min", StringType.GLOBAL)
.add("format", "时间格式", "如: MM-dd:HH", StringType.GLOBAL)
.add("limit", "最大数据量", "", StringType.GLOBAL)
.add("from", "时间从", "", StringType.GLOBAL)
.add("to", "时间至", "", StringType.GLOBAL);
class AggRecordRankDimension implements MeasurementDimension {
@ -87,49 +86,52 @@ public class AlarmRecordRankMeasurement extends StaticMeasurement {
public AggregationQueryParam createQueryParam(MeasurementParameter parameter) {
AggregationQueryParam aggregationQueryParam = AggregationQueryParam.of();
aggregationQueryParam.setTimeProperty("alarmTime");
String targetType = parameter.getString("targetType").orElse(null);
String targetId = parameter.getString("targetId").orElse(null);
return aggregationQueryParam
.groupBy(parameter.getString("group", "targetId"))
.count("targetId", "count")
.agg("targetId", Aggregation.TOP)
.agg("targetName", Aggregation.TOP)
.filter(query -> query
.when(StringUtils.hasText(targetType), q -> q.and("targetType",targetType))
.when(StringUtils.hasText(targetId), q -> q.and("targetId",targetId))
)
.limit(parameter.getInt("limit").orElse(1))
.from(parameter
.getDate("from")
.orElseGet(() -> Date
.from(LocalDateTime
.now()
.plusDays(-1)
.atZone(ZoneId.systemDefault())
.toInstant())))
.to(parameter.getDate("to").orElse(new Date()));
.groupBy(parameter.getString("group", "targetId"))
.count("targetId", "count")
.agg("targetId", Aggregation.TOP)
.agg("targetName", Aggregation.TOP)
.limit(parameter.getInt("limit").orElse(1))
.from(parameter
.getDate("from")
.orElseGet(() -> Date
.from(LocalDateTime
.now()
.plusDays(-1)
.atZone(ZoneId.systemDefault())
.toInstant())))
.to(parameter.getDate("to").orElse(new Date()));
}
@Override
public Flux<SimpleMeasurementValue> getValue(MeasurementParameter parameter) {
Comparator<AggregationData> comparator;
if (Objects.equals(parameter.getString("order",""), "asc")){
comparator = Comparator.comparingLong(d-> d.getLong("count", 0L));
}else {
comparator = Comparator.<AggregationData>comparingLong(d-> d.getLong("count", 0L)).reversed();
if (Objects.equals(parameter.getString("order", ""), "asc")) {
comparator = Comparator.comparingLong(d -> d.getLong("count", 0L));
} else {
comparator = Comparator.<AggregationData>comparingLong(d -> d.getLong("count", 0L)).reversed();
}
AggregationQueryParam param = createQueryParam(parameter);
return param
.execute(historyService::aggregation)
.groupBy(a -> a.getString("targetId", null))
.flatMap(fluxGroup -> fluxGroup.reduce(AggregationData::merge))
.sort(comparator)
.map(data -> SimpleMeasurementValue.of(new SimpleResult(data), 0))
.take(param.getLimit());
}
return Flux.defer(() -> param
.execute(timeSeriesManager.getService(AlarmTimeSeriesMetric.alarmStreamMetrics())::aggregation)
.groupBy(a -> a.getString("targetId", null))
.flatMap(fluxGroup -> fluxGroup.reduce(AggregationData::merge))
.sort(comparator)
.map(data -> SimpleMeasurementValue.of(new SimpleResult(data), 0))
)
.take(param.getLimit());
public Flux<SimpleMeasurementValue> getValue(AggregationQueryParam param, Comparator<AggregationData> comparator) {
return param
.execute(historyService::aggregation)
.groupBy(a -> a.getString("targetId", null))
.flatMap(fluxGroup -> fluxGroup.reduce(AggregationData::merge))
.sort(comparator)
.map(data -> SimpleMeasurementValue.of(new SimpleResult(data), 0))
.take(param.getLimit());
}
@Getter

View File

@ -15,16 +15,15 @@
*/
package org.jetlinks.community.rule.engine.measurement;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.StringType;
import org.springframework.util.StringUtils;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.rule.engine.service.AlarmHistoryService;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import reactor.core.publisher.Flux;
import java.time.LocalDateTime;
@ -36,23 +35,23 @@ import java.util.Date;
*/
public class AlarmRecordTrendMeasurement extends StaticMeasurement {
TimeSeriesManager timeSeriesManager;
AlarmHistoryService historyService;
public AlarmRecordTrendMeasurement(TimeSeriesManager timeSeriesManager) {
public AlarmRecordTrendMeasurement(AlarmHistoryService historyService) {
super(MeasurementDefinition.of("trend", "告警记录趋势"));
this.timeSeriesManager = timeSeriesManager;
this.historyService = historyService;
addDimension(new AggRecordTrendDimension());
}
static ConfigMetadata aggConfigMetadata = new DefaultConfigMetadata()
.add("alarmConfigId", "告警配置Id", "", StringType.GLOBAL)
.add("time", "周期", "例如: 1h,10m,30s", StringType.GLOBAL)
.add("agg", "聚合类型", "count,sum,avg,max,min", StringType.GLOBAL)
.add("format", "时间格式", "如: MM-dd:HH", StringType.GLOBAL)
.add("limit", "最大数据量", "", StringType.GLOBAL)
.add("from", "时间从", "", StringType.GLOBAL)
.add("to", "时间至", "", StringType.GLOBAL);
.add("alarmConfigId", "告警配置Id", "", StringType.GLOBAL)
.add("time", "周期", "例如: 1h,10m,30s", StringType.GLOBAL)
.add("agg", "聚合类型", "count,sum,avg,max,min", StringType.GLOBAL)
.add("format", "时间格式", "如: MM-dd:HH", StringType.GLOBAL)
.add("limit", "最大数据量", "", StringType.GLOBAL)
.add("from", "时间从", "", StringType.GLOBAL)
.add("to", "时间至", "", StringType.GLOBAL);
class AggRecordTrendDimension implements MeasurementDimension {
@ -78,39 +77,34 @@ public class AlarmRecordTrendMeasurement extends StaticMeasurement {
}
public AggregationQueryParam createQueryParam(MeasurementParameter parameter) {
String targetType = parameter.getString("targetType").orElse(null);
String targetId = parameter.getString("targetId").orElse(null);
return AggregationQueryParam
.of()
.groupBy(parameter.getInterval("time", null),
parameter.getString("format").orElse("MM月dd日 HH时"))
.sum("count", "count")
.filter(query -> query
.when(StringUtils.hasText(targetType), q -> q.and("targetType", targetType))
.when(StringUtils.hasText(targetId), q -> q.and("targetId",targetId))
)
.limit(parameter.getInt("limit").orElse(1))
.from(parameter
.getDate("from")
.orElseGet(() -> Date
.from(LocalDateTime
.now()
.plusDays(-1)
.atZone(ZoneId.systemDefault())
.toInstant())))
.to(parameter.getDate("to").orElse(new Date()));
.of()
.groupBy(parameter.getInterval("time", null),
parameter.getString("format").orElse("MM月dd日 HH时"))
.count("targetId", "count")
.limit(parameter.getInt("limit").orElse(1))
.from(parameter
.getDate("from")
.orElseGet(() -> Date
.from(LocalDateTime
.now()
.plusDays(-1)
.atZone(ZoneId.systemDefault())
.toInstant())))
.to(parameter.getDate("to").orElse(new Date()));
}
@Override
public Flux<SimpleMeasurementValue> getValue(MeasurementParameter parameter) {
AggregationQueryParam param = createQueryParam(parameter);
return Flux.defer(()-> param
.execute(timeSeriesManager.getService(AlarmTimeSeriesMetric.alarmStreamMetrics())::aggregation)
.index((index, data) -> SimpleMeasurementValue.of(
data.getLong("count",0),
data.getString("time",""),
index)))
.take(param.getLimit());
return param
.execute(historyService::aggregation)
.index((index, data) -> SimpleMeasurementValue.of(
data.getLong("count", 0),
data.getString("time", ""),
index))
.take(param.getLimit());
}
}
}

View File

@ -16,8 +16,8 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.locales>zh_CN</project.build.locales>
<spring.boot.version>3.4.3</spring.boot.version>
<spring.framework.version>6.2.3</spring.framework.version>
<spring.boot.version>3.4.7</spring.boot.version>
<spring.framework.version>6.2.8</spring.framework.version>
<java.version>17</java.version>
<project.build.jdk>${java.version}</project.build.jdk>
<maven.compiler.source>${java.version}</maven.compiler.source>
@ -44,7 +44,7 @@
<!-- 第三方依赖版本 -->
<r2dbc.version>Borca-SR2</r2dbc.version>
<hsweb.expands.version>3.0.2</hsweb.expands.version>
<netty.version>4.1.111.Final</netty.version>
<netty.version>4.1.122.Final</netty.version>
<elasticsearch.version>8.15.4</elasticsearch.version>
<elasticsearch7x.version>7.17.26</elasticsearch7x.version>
<californium.version>3.12.0</californium.version>