feat(timescaledb): 支持配置TimescaleDB函数所在的schema (#710)
* feat(timescaledb): 支持配置TimescaleDB函数所在的schema - 在application.yml和application-default.yml中新增TIMESCALEDB_FUNCTION_SCHEMA配置项 - 修改TimescaleDBCreateTableSqlBuilder类,移除构造函数中的schema参数 - 更新创建超表和保留策略的SQL语句,使用functionSchema替代原有schema - 在TimescaleDBTimeSeriesManager中注入TimescaleDBProperties并设置FunctionSchema特性 - 调整时间分组列的创建逻辑,支持传入functionSchema参数 - 引入FunctionSchema类管理TimescaleDB相关的函数位置信息 * 把配置放进RDBDatabaseMetadata里 * 优化
This commit is contained in:
parent
4d7d40696d
commit
deff207dd8
|
|
@ -22,6 +22,8 @@ import org.jetlinks.community.timescaledb.impl.DefaultTimescaleDBDataWriter;
|
|||
import org.springframework.boot.autoconfigure.r2dbc.R2dbcProperties;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
@ConfigurationProperties(prefix = "timescaledb")
|
||||
@Getter
|
||||
@Setter
|
||||
|
|
@ -39,6 +41,15 @@ public class TimescaleDBProperties {
|
|||
//数据库的schema
|
||||
private String schema = "public";
|
||||
|
||||
/**
|
||||
* TimescaleDB超表函数所在的位置
|
||||
*/
|
||||
private String functionSchema = null;
|
||||
|
||||
public String getFunctionSchema() {
|
||||
return functionSchema == null ? schema : functionSchema;
|
||||
}
|
||||
|
||||
/**
|
||||
* 写入缓冲区配置
|
||||
*
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ public class TimescaleDBUtils {
|
|||
return ThingsDatabaseUtils.createTableName(name);
|
||||
}
|
||||
|
||||
public static NativeSelectColumn createTimeGroupColumn(long startWith, Interval interval) {
|
||||
public static NativeSelectColumn createTimeGroupColumn(long startWith, Interval interval, String functionSchema) {
|
||||
|
||||
String unit = interval.getNumber().intValue() + " " + interval
|
||||
.getUnit()
|
||||
|
|
@ -48,7 +48,7 @@ public class TimescaleDBUtils {
|
|||
.toLowerCase();
|
||||
|
||||
return NativeSelectColumn
|
||||
.of("time_bucket('" + unit + "',timestamp)");
|
||||
.of("\"" + functionSchema + "\"" + ".time_bucket('" + unit + "',timestamp)");
|
||||
}
|
||||
|
||||
public static TimeSeriesData convertToTimeSeriesData(Record record) {
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@
|
|||
package org.jetlinks.community.timescaledb.configuration;
|
||||
|
||||
import org.jetlinks.community.timescaledb.TimescaleDBOperations;
|
||||
import org.jetlinks.community.timescaledb.TimescaleDBProperties;
|
||||
import org.jetlinks.community.timescaledb.timeseries.TimescaleDBTimeSeriesManager;
|
||||
import org.jetlinks.community.timescaledb.timeseries.TimescaleDBTimeSeriesProperties;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
|
|
@ -35,8 +36,9 @@ public class TimescaleDBTimeSeriesConfiguration {
|
|||
@Bean
|
||||
@Primary
|
||||
public TimescaleDBTimeSeriesManager timescaleDBTimeSeriesManager(TimescaleDBOperations operations,
|
||||
TimescaleDBTimeSeriesProperties properties) {
|
||||
return new TimescaleDBTimeSeriesManager(properties, operations);
|
||||
TimescaleDBTimeSeriesProperties properties,
|
||||
TimescaleDBProperties timescaleDBProperties) {
|
||||
return new TimescaleDBTimeSeriesManager(properties, operations,timescaleDBProperties);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ import org.jetlinks.community.timescaledb.TimescaleDBDataWriter;
|
|||
import org.jetlinks.community.timescaledb.TimescaleDBOperations;
|
||||
import org.jetlinks.community.timescaledb.TimescaleDBProperties;
|
||||
import org.jetlinks.community.timescaledb.metadata.TimescaleDBDialectProvider;
|
||||
import org.jetlinks.community.timescaledb.metadata.TimescaleDBPropertiesFeature;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
|
|
@ -64,7 +65,7 @@ public class DefaultTimescaleDBOperations implements TimescaleDBOperations, Appl
|
|||
RDBDatabaseMetadata database = new RDBDatabaseMetadata(Dialect.POSTGRES);
|
||||
database.addFeature(sqlExecutor);
|
||||
database.addFeature(ReactiveSyncSqlExecutor.of(sqlExecutor));
|
||||
|
||||
database.addFeature(TimescaleDBPropertiesFeature.of(properties));
|
||||
RDBSchemaMetadata schema = TimescaleDBDialectProvider.GLOBAL.createSchema(properties.getSchema());
|
||||
database.addSchema(schema);
|
||||
database.setCurrentSchema(schema);
|
||||
|
|
@ -91,6 +92,7 @@ public class DefaultTimescaleDBOperations implements TimescaleDBOperations, Appl
|
|||
.create("TimescaleDB", datasource);
|
||||
disposable.add(dataSource);
|
||||
database = dataSource.operator();
|
||||
database.getMetadata().addFeature(TimescaleDBPropertiesFeature.of(properties));
|
||||
}
|
||||
writer = new DefaultTimescaleDBDataWriter(database, properties.getWriteBuffer());
|
||||
writer.init();
|
||||
|
|
|
|||
|
|
@ -23,16 +23,11 @@ import org.hswebframework.ezorm.rdb.operator.builder.fragments.ddl.CommonCreateT
|
|||
|
||||
public class TimescaleDBCreateTableSqlBuilder extends CommonCreateTableSqlBuilder {
|
||||
|
||||
private String schema;
|
||||
|
||||
public TimescaleDBCreateTableSqlBuilder(String schema) {
|
||||
this.schema = schema;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SqlRequest build(RDBTableMetadata table) {
|
||||
DefaultBatchSqlRequest sqlRequest = (DefaultBatchSqlRequest) super.build(table);
|
||||
|
||||
|
||||
table.getFeature(CreateHypertable.ID)
|
||||
.ifPresent(createHypertable -> sqlRequest.addBatch(createCreateHypertableSQL(table, createHypertable)));
|
||||
|
||||
|
|
@ -47,9 +42,12 @@ public class TimescaleDBCreateTableSqlBuilder extends CommonCreateTableSqlBuilde
|
|||
|
||||
String interval = createHypertable.getInterval().getNumber().intValue() + " "
|
||||
+ createHypertable.getInterval().getUnit().name().toLowerCase();
|
||||
String functionSchema = table.findFeatureNow(TimescaleDBPropertiesFeature.ID)
|
||||
.getProperties()
|
||||
.getFunctionSchema();
|
||||
|
||||
return SqlRequests.of(
|
||||
"SELECT "+ schema +".add_retention_policy( ? , INTERVAL '" + interval + "')",
|
||||
"SELECT " + "\"" + functionSchema + "\"" + ".add_retention_policy( ? , INTERVAL '" + interval + "')",
|
||||
table.getFullName()
|
||||
);
|
||||
}
|
||||
|
|
@ -58,9 +56,11 @@ public class TimescaleDBCreateTableSqlBuilder extends CommonCreateTableSqlBuilde
|
|||
|
||||
String interval = createHypertable.getChunkTimeInterval().getNumber().intValue() + " "
|
||||
+ createHypertable.getChunkTimeInterval().getUnit().name().toLowerCase();
|
||||
|
||||
String functionSchema = table.findFeatureNow(TimescaleDBPropertiesFeature.ID)
|
||||
.getProperties()
|
||||
.getFunctionSchema();
|
||||
return SqlRequests.of(
|
||||
"SELECT "+ schema +".create_hypertable( ? , ? , chunk_time_interval => INTERVAL '" + interval + "')",
|
||||
"SELECT " + "\"" + functionSchema + "\"" + ".create_hypertable( ? , ? , chunk_time_interval => INTERVAL '" + interval + "')",
|
||||
table.getFullName(),
|
||||
table.getColumnNow(createHypertable.getColumn()).getName()
|
||||
);
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ public class TimescaleDBDialectProvider implements DialectProvider {
|
|||
@Override
|
||||
public RDBSchemaMetadata createSchema(String name) {
|
||||
PostgresqlSchemaMetadata schema = new PostgresqlSchemaMetadata(name);
|
||||
schema.addFeature(new TimescaleDBCreateTableSqlBuilder(name));
|
||||
schema.addFeature(new TimescaleDBCreateTableSqlBuilder());
|
||||
schema.addFeature(new TimescaleDBAlterTableSqlBuilder());
|
||||
DefaultValueCodecFactory codecFactory = new DefaultValueCodecFactory();
|
||||
codecFactory
|
||||
|
|
|
|||
|
|
@ -0,0 +1,32 @@
|
|||
package org.jetlinks.community.timescaledb.metadata;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import org.hswebframework.ezorm.core.FeatureId;
|
||||
import org.hswebframework.ezorm.core.FeatureType;
|
||||
import org.hswebframework.ezorm.core.meta.Feature;
|
||||
import org.jetlinks.community.timescaledb.TimescaleDBProperties;
|
||||
|
||||
@AllArgsConstructor(staticName = "of")
|
||||
@Getter
|
||||
public class TimescaleDBPropertiesFeature implements Feature, FeatureType {
|
||||
|
||||
public static final FeatureId<TimescaleDBPropertiesFeature> ID = FeatureId.of("TimescaleDBPropertiesFeature");
|
||||
|
||||
private final TimescaleDBProperties properties;
|
||||
|
||||
@Override
|
||||
public String getId() {
|
||||
return ID.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "TimescaleDBPropertiesFeature";
|
||||
}
|
||||
|
||||
@Override
|
||||
public FeatureType getType() {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
@ -28,7 +28,7 @@ import org.hswebframework.ezorm.rdb.operator.dml.query.SelectColumn;
|
|||
import org.hswebframework.web.api.crud.entity.PagerResult;
|
||||
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
|
||||
import org.hswebframework.web.crud.query.QueryHelper;
|
||||
import org.jetlinks.core.metadata.EventMetadata;
|
||||
import org.jetlinks.community.timescaledb.metadata.TimescaleDBPropertiesFeature;
|
||||
import org.jetlinks.core.things.ThingsRegistry;
|
||||
import org.jetlinks.community.things.data.AggregationRequest;
|
||||
import org.jetlinks.community.things.data.PropertyAggregation;
|
||||
|
|
@ -37,7 +37,6 @@ import org.jetlinks.community.things.data.ThingsDataUtils;
|
|||
import org.jetlinks.community.things.data.operations.ColumnModeQueryOperationsBase;
|
||||
import org.jetlinks.community.things.data.operations.DataSettings;
|
||||
import org.jetlinks.community.things.data.operations.MetricBuilder;
|
||||
import org.jetlinks.community.things.data.operations.RowModeQueryOperationsBase;
|
||||
import org.jetlinks.community.timescaledb.TimescaleDBUtils;
|
||||
import org.jetlinks.community.timeseries.TimeSeriesData;
|
||||
import org.jetlinks.community.timeseries.query.Aggregation;
|
||||
|
|
@ -122,7 +121,11 @@ public class TimescaleDBColumnModeQueryOperations extends ColumnModeQueryOperati
|
|||
if (request.getInterval() != null) {
|
||||
NativeSelectColumn column = createTimeGroupColumn(
|
||||
request.getFrom().getTime(),
|
||||
request.getInterval()
|
||||
request.getInterval(),
|
||||
database.getMetadata()
|
||||
.getFeatureNow(TimescaleDBPropertiesFeature.ID)
|
||||
.getProperties()
|
||||
.getFunctionSchema()
|
||||
);
|
||||
query.groupBy(column);
|
||||
query.select(column);
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ import org.hswebframework.ezorm.rdb.operator.dml.query.SelectColumn;
|
|||
import org.hswebframework.web.api.crud.entity.PagerResult;
|
||||
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
|
||||
import org.hswebframework.web.crud.query.QueryHelper;
|
||||
import org.jetlinks.core.metadata.EventMetadata;
|
||||
import org.jetlinks.community.timescaledb.metadata.TimescaleDBPropertiesFeature;
|
||||
import org.jetlinks.core.things.ThingsRegistry;
|
||||
import org.jetlinks.community.things.data.AggregationRequest;
|
||||
import org.jetlinks.community.things.data.PropertyAggregation;
|
||||
|
|
@ -49,8 +49,6 @@ import reactor.core.publisher.Mono;
|
|||
import java.util.*;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.jetlinks.community.timescaledb.thing.TimescaleDBColumnModeQueryOperations.doAggregation0;
|
||||
|
||||
@Slf4j
|
||||
public class TimescaleDBRowModeQueryOperations extends RowModeQueryOperationsBase {
|
||||
private final DatabaseOperator database;
|
||||
|
|
@ -117,7 +115,11 @@ public class TimescaleDBRowModeQueryOperations extends RowModeQueryOperationsBas
|
|||
if (request.getInterval() != null) {
|
||||
NativeSelectColumn column = TimescaleDBUtils.createTimeGroupColumn(
|
||||
request.getFrom().getTime(),
|
||||
request.getInterval()
|
||||
request.getInterval(),
|
||||
database.getMetadata()
|
||||
.getFeatureNow(TimescaleDBPropertiesFeature.ID)
|
||||
.getProperties()
|
||||
.getFunctionSchema()
|
||||
);
|
||||
|
||||
query.groupBy(column);
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||
import org.hswebframework.ezorm.rdb.codec.DateTimeCodec;
|
||||
import org.hswebframework.ezorm.rdb.metadata.RDBIndexMetadata;
|
||||
import org.hswebframework.ezorm.rdb.operator.ddl.TableBuilder;
|
||||
import org.jetlinks.community.timescaledb.TimescaleDBProperties;
|
||||
import org.jetlinks.core.metadata.PropertyMetadata;
|
||||
import org.jetlinks.community.Interval;
|
||||
import org.jetlinks.community.things.data.ThingsDataConstants;
|
||||
|
|
@ -48,6 +49,8 @@ public class TimescaleDBTimeSeriesManager implements TimeSeriesManager {
|
|||
|
||||
private final TimescaleDBOperations operations;
|
||||
|
||||
private final TimescaleDBProperties timescaleDBProperties;
|
||||
|
||||
@Override
|
||||
public TimeSeriesService getService(TimeSeriesMetric metric) {
|
||||
return getService(metric.getId());
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import org.hswebframework.ezorm.rdb.operator.dml.query.NativeSelectColumn;
|
|||
import org.hswebframework.ezorm.rdb.operator.dml.query.SelectColumn;
|
||||
import org.hswebframework.web.bean.FastBeanCopier;
|
||||
import org.hswebframework.web.id.IDGenerator;
|
||||
import org.jetlinks.community.timescaledb.metadata.TimescaleDBPropertiesFeature;
|
||||
import org.jetlinks.core.utils.Reactors;
|
||||
import org.jetlinks.community.things.data.ThingsDataConstants;
|
||||
import org.jetlinks.community.timescaledb.TimescaleDBOperations;
|
||||
|
|
@ -113,7 +114,14 @@ public class TimescaleDBTimeSeriesService implements TimeSeriesService {
|
|||
for (Group group : groups) {
|
||||
if (group instanceof TimeGroup) {
|
||||
_timeGroup = ((TimeGroup) group);
|
||||
NativeSelectColumn column = TimescaleDBUtils.createTimeGroupColumn(startWith, _timeGroup.getInterval());
|
||||
NativeSelectColumn column = TimescaleDBUtils.createTimeGroupColumn(startWith,
|
||||
_timeGroup.getInterval(),
|
||||
operations
|
||||
.database()
|
||||
.getMetadata()
|
||||
.getFeatureNow(TimescaleDBPropertiesFeature.ID)
|
||||
.getProperties()
|
||||
.getFunctionSchema());
|
||||
column.setColumn(ThingsDataConstants.COLUMN_TIMESTAMP);
|
||||
column.setAlias(group.getAlias());
|
||||
query.select(column);
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ REDIS_DATABASE: 0 # redis 数据库索引
|
|||
|
||||
# timescalbedb相关配置
|
||||
TIMESCALEDB_SCHEMA: public # timescaledb 数据库schema,默认public
|
||||
TIMESCALEDB_FUNCTION_SCHEMA: public # 时序数据所用到相关函数所在的schema
|
||||
|
||||
|
||||
# elasticsearch相关配置
|
||||
|
|
|
|||
|
|
@ -61,6 +61,7 @@ timescaledb:
|
|||
# username: postgres
|
||||
# password: p@ssw0rd
|
||||
schema: ${TIMESCALEDB_SCHEMA:${easyorm.default-schema}} # timescaledb的schema,默认public
|
||||
function-schema: ${TIMESCALEDB_FUNCTION_SCHEMA:${timescaledb.schema}} #时序数据所用到相关函数所在的schema
|
||||
time-series:
|
||||
enabled: true
|
||||
retention-policies:
|
||||
|
|
|
|||
Loading…
Reference in New Issue