From deff207dd899d6ffa1a1e571957bc31641a4faae Mon Sep 17 00:00:00 2001 From: PengyuDeng <89559616+PengyuDeng@users.noreply.github.com> Date: Wed, 5 Nov 2025 18:38:52 +0800 Subject: [PATCH] =?UTF-8?q?feat(timescaledb):=20=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E9=85=8D=E7=BD=AETimescaleDB=E5=87=BD=E6=95=B0=E6=89=80?= =?UTF-8?q?=E5=9C=A8=E7=9A=84schema=20(#710)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(timescaledb): 支持配置TimescaleDB函数所在的schema - 在application.yml和application-default.yml中新增TIMESCALEDB_FUNCTION_SCHEMA配置项 - 修改TimescaleDBCreateTableSqlBuilder类,移除构造函数中的schema参数 - 更新创建超表和保留策略的SQL语句,使用functionSchema替代原有schema - 在TimescaleDBTimeSeriesManager中注入TimescaleDBProperties并设置FunctionSchema特性 - 调整时间分组列的创建逻辑,支持传入functionSchema参数 - 引入FunctionSchema类管理TimescaleDB相关的函数位置信息 * 把配置放进RDBDatabaseMetadata里 * 优化 --- .../timescaledb/TimescaleDBProperties.java | 11 +++++++ .../timescaledb/TimescaleDBUtils.java | 4 +-- .../TimescaleDBTimeSeriesConfiguration.java | 6 ++-- .../impl/DefaultTimescaleDBOperations.java | 4 ++- .../TimescaleDBCreateTableSqlBuilder.java | 18 +++++------ .../metadata/TimescaleDBDialectProvider.java | 2 +- .../TimescaleDBPropertiesFeature.java | 32 +++++++++++++++++++ .../TimescaleDBColumnModeQueryOperations.java | 9 ++++-- .../TimescaleDBRowModeQueryOperations.java | 10 +++--- .../TimescaleDBTimeSeriesManager.java | 3 ++ .../TimescaleDBTimeSeriesService.java | 10 +++++- .../main/resources/application-default.yml | 1 + .../src/main/resources/application.yml | 1 + 13 files changed, 88 insertions(+), 23 deletions(-) create mode 100644 jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/metadata/TimescaleDBPropertiesFeature.java diff --git a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/TimescaleDBProperties.java b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/TimescaleDBProperties.java index c3432817..407c46a7 100644 --- a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/TimescaleDBProperties.java +++ b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/TimescaleDBProperties.java @@ -22,6 +22,8 @@ import org.jetlinks.community.timescaledb.impl.DefaultTimescaleDBDataWriter; import org.springframework.boot.autoconfigure.r2dbc.R2dbcProperties; import org.springframework.boot.context.properties.ConfigurationProperties; +import java.util.Objects; + @ConfigurationProperties(prefix = "timescaledb") @Getter @Setter @@ -39,6 +41,15 @@ public class TimescaleDBProperties { //数据库的schema private String schema = "public"; + /** + * TimescaleDB超表函数所在的位置 + */ + private String functionSchema = null; + + public String getFunctionSchema() { + return functionSchema == null ? schema : functionSchema; + } + /** * 写入缓冲区配置 * diff --git a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/TimescaleDBUtils.java b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/TimescaleDBUtils.java index 73df1e77..8a9975c0 100644 --- a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/TimescaleDBUtils.java +++ b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/TimescaleDBUtils.java @@ -40,7 +40,7 @@ public class TimescaleDBUtils { return ThingsDatabaseUtils.createTableName(name); } - public static NativeSelectColumn createTimeGroupColumn(long startWith, Interval interval) { + public static NativeSelectColumn createTimeGroupColumn(long startWith, Interval interval, String functionSchema) { String unit = interval.getNumber().intValue() + " " + interval .getUnit() @@ -48,7 +48,7 @@ public class TimescaleDBUtils { .toLowerCase(); return NativeSelectColumn - .of("time_bucket('" + unit + "',timestamp)"); + .of("\"" + functionSchema + "\"" + ".time_bucket('" + unit + "',timestamp)"); } public static TimeSeriesData convertToTimeSeriesData(Record record) { diff --git a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/configuration/TimescaleDBTimeSeriesConfiguration.java b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/configuration/TimescaleDBTimeSeriesConfiguration.java index 167c2266..7353ad5e 100644 --- a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/configuration/TimescaleDBTimeSeriesConfiguration.java +++ b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/configuration/TimescaleDBTimeSeriesConfiguration.java @@ -16,6 +16,7 @@ package org.jetlinks.community.timescaledb.configuration; import org.jetlinks.community.timescaledb.TimescaleDBOperations; +import org.jetlinks.community.timescaledb.TimescaleDBProperties; import org.jetlinks.community.timescaledb.timeseries.TimescaleDBTimeSeriesManager; import org.jetlinks.community.timescaledb.timeseries.TimescaleDBTimeSeriesProperties; import org.springframework.boot.autoconfigure.AutoConfiguration; @@ -35,8 +36,9 @@ public class TimescaleDBTimeSeriesConfiguration { @Bean @Primary public TimescaleDBTimeSeriesManager timescaleDBTimeSeriesManager(TimescaleDBOperations operations, - TimescaleDBTimeSeriesProperties properties) { - return new TimescaleDBTimeSeriesManager(properties, operations); + TimescaleDBTimeSeriesProperties properties, + TimescaleDBProperties timescaleDBProperties) { + return new TimescaleDBTimeSeriesManager(properties, operations,timescaleDBProperties); } diff --git a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/impl/DefaultTimescaleDBOperations.java b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/impl/DefaultTimescaleDBOperations.java index 4fcf9bd8..36064cb1 100644 --- a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/impl/DefaultTimescaleDBOperations.java +++ b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/impl/DefaultTimescaleDBOperations.java @@ -31,6 +31,7 @@ import org.jetlinks.community.timescaledb.TimescaleDBDataWriter; import org.jetlinks.community.timescaledb.TimescaleDBOperations; import org.jetlinks.community.timescaledb.TimescaleDBProperties; import org.jetlinks.community.timescaledb.metadata.TimescaleDBDialectProvider; +import org.jetlinks.community.timescaledb.metadata.TimescaleDBPropertiesFeature; import org.springframework.beans.BeansException; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; @@ -64,7 +65,7 @@ public class DefaultTimescaleDBOperations implements TimescaleDBOperations, Appl RDBDatabaseMetadata database = new RDBDatabaseMetadata(Dialect.POSTGRES); database.addFeature(sqlExecutor); database.addFeature(ReactiveSyncSqlExecutor.of(sqlExecutor)); - + database.addFeature(TimescaleDBPropertiesFeature.of(properties)); RDBSchemaMetadata schema = TimescaleDBDialectProvider.GLOBAL.createSchema(properties.getSchema()); database.addSchema(schema); database.setCurrentSchema(schema); @@ -91,6 +92,7 @@ public class DefaultTimescaleDBOperations implements TimescaleDBOperations, Appl .create("TimescaleDB", datasource); disposable.add(dataSource); database = dataSource.operator(); + database.getMetadata().addFeature(TimescaleDBPropertiesFeature.of(properties)); } writer = new DefaultTimescaleDBDataWriter(database, properties.getWriteBuffer()); writer.init(); diff --git a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/metadata/TimescaleDBCreateTableSqlBuilder.java b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/metadata/TimescaleDBCreateTableSqlBuilder.java index a1c368d5..cb152b3e 100644 --- a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/metadata/TimescaleDBCreateTableSqlBuilder.java +++ b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/metadata/TimescaleDBCreateTableSqlBuilder.java @@ -23,16 +23,11 @@ import org.hswebframework.ezorm.rdb.operator.builder.fragments.ddl.CommonCreateT public class TimescaleDBCreateTableSqlBuilder extends CommonCreateTableSqlBuilder { - private String schema; - - public TimescaleDBCreateTableSqlBuilder(String schema) { - this.schema = schema; - } - @Override public SqlRequest build(RDBTableMetadata table) { DefaultBatchSqlRequest sqlRequest = (DefaultBatchSqlRequest) super.build(table); + table.getFeature(CreateHypertable.ID) .ifPresent(createHypertable -> sqlRequest.addBatch(createCreateHypertableSQL(table, createHypertable))); @@ -47,9 +42,12 @@ public class TimescaleDBCreateTableSqlBuilder extends CommonCreateTableSqlBuilde String interval = createHypertable.getInterval().getNumber().intValue() + " " + createHypertable.getInterval().getUnit().name().toLowerCase(); + String functionSchema = table.findFeatureNow(TimescaleDBPropertiesFeature.ID) + .getProperties() + .getFunctionSchema(); return SqlRequests.of( - "SELECT "+ schema +".add_retention_policy( ? , INTERVAL '" + interval + "')", + "SELECT " + "\"" + functionSchema + "\"" + ".add_retention_policy( ? , INTERVAL '" + interval + "')", table.getFullName() ); } @@ -58,9 +56,11 @@ public class TimescaleDBCreateTableSqlBuilder extends CommonCreateTableSqlBuilde String interval = createHypertable.getChunkTimeInterval().getNumber().intValue() + " " + createHypertable.getChunkTimeInterval().getUnit().name().toLowerCase(); - + String functionSchema = table.findFeatureNow(TimescaleDBPropertiesFeature.ID) + .getProperties() + .getFunctionSchema(); return SqlRequests.of( - "SELECT "+ schema +".create_hypertable( ? , ? , chunk_time_interval => INTERVAL '" + interval + "')", + "SELECT " + "\"" + functionSchema + "\"" + ".create_hypertable( ? , ? , chunk_time_interval => INTERVAL '" + interval + "')", table.getFullName(), table.getColumnNow(createHypertable.getColumn()).getName() ); diff --git a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/metadata/TimescaleDBDialectProvider.java b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/metadata/TimescaleDBDialectProvider.java index dd7c06ae..2d3a185e 100644 --- a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/metadata/TimescaleDBDialectProvider.java +++ b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/metadata/TimescaleDBDialectProvider.java @@ -49,7 +49,7 @@ public class TimescaleDBDialectProvider implements DialectProvider { @Override public RDBSchemaMetadata createSchema(String name) { PostgresqlSchemaMetadata schema = new PostgresqlSchemaMetadata(name); - schema.addFeature(new TimescaleDBCreateTableSqlBuilder(name)); + schema.addFeature(new TimescaleDBCreateTableSqlBuilder()); schema.addFeature(new TimescaleDBAlterTableSqlBuilder()); DefaultValueCodecFactory codecFactory = new DefaultValueCodecFactory(); codecFactory diff --git a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/metadata/TimescaleDBPropertiesFeature.java b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/metadata/TimescaleDBPropertiesFeature.java new file mode 100644 index 00000000..575d543d --- /dev/null +++ b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/metadata/TimescaleDBPropertiesFeature.java @@ -0,0 +1,32 @@ +package org.jetlinks.community.timescaledb.metadata; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.hswebframework.ezorm.core.FeatureId; +import org.hswebframework.ezorm.core.FeatureType; +import org.hswebframework.ezorm.core.meta.Feature; +import org.jetlinks.community.timescaledb.TimescaleDBProperties; + +@AllArgsConstructor(staticName = "of") +@Getter +public class TimescaleDBPropertiesFeature implements Feature, FeatureType { + + public static final FeatureId ID = FeatureId.of("TimescaleDBPropertiesFeature"); + + private final TimescaleDBProperties properties; + + @Override + public String getId() { + return ID.getId(); + } + + @Override + public String getName() { + return "TimescaleDBPropertiesFeature"; + } + + @Override + public FeatureType getType() { + return this; + } +} diff --git a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/thing/TimescaleDBColumnModeQueryOperations.java b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/thing/TimescaleDBColumnModeQueryOperations.java index 416d5314..84da40b8 100644 --- a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/thing/TimescaleDBColumnModeQueryOperations.java +++ b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/thing/TimescaleDBColumnModeQueryOperations.java @@ -28,7 +28,7 @@ import org.hswebframework.ezorm.rdb.operator.dml.query.SelectColumn; import org.hswebframework.web.api.crud.entity.PagerResult; import org.hswebframework.web.api.crud.entity.QueryParamEntity; import org.hswebframework.web.crud.query.QueryHelper; -import org.jetlinks.core.metadata.EventMetadata; +import org.jetlinks.community.timescaledb.metadata.TimescaleDBPropertiesFeature; import org.jetlinks.core.things.ThingsRegistry; import org.jetlinks.community.things.data.AggregationRequest; import org.jetlinks.community.things.data.PropertyAggregation; @@ -37,7 +37,6 @@ import org.jetlinks.community.things.data.ThingsDataUtils; import org.jetlinks.community.things.data.operations.ColumnModeQueryOperationsBase; import org.jetlinks.community.things.data.operations.DataSettings; import org.jetlinks.community.things.data.operations.MetricBuilder; -import org.jetlinks.community.things.data.operations.RowModeQueryOperationsBase; import org.jetlinks.community.timescaledb.TimescaleDBUtils; import org.jetlinks.community.timeseries.TimeSeriesData; import org.jetlinks.community.timeseries.query.Aggregation; @@ -122,7 +121,11 @@ public class TimescaleDBColumnModeQueryOperations extends ColumnModeQueryOperati if (request.getInterval() != null) { NativeSelectColumn column = createTimeGroupColumn( request.getFrom().getTime(), - request.getInterval() + request.getInterval(), + database.getMetadata() + .getFeatureNow(TimescaleDBPropertiesFeature.ID) + .getProperties() + .getFunctionSchema() ); query.groupBy(column); query.select(column); diff --git a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/thing/TimescaleDBRowModeQueryOperations.java b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/thing/TimescaleDBRowModeQueryOperations.java index 265a0288..3ca5755f 100644 --- a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/thing/TimescaleDBRowModeQueryOperations.java +++ b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/thing/TimescaleDBRowModeQueryOperations.java @@ -28,7 +28,7 @@ import org.hswebframework.ezorm.rdb.operator.dml.query.SelectColumn; import org.hswebframework.web.api.crud.entity.PagerResult; import org.hswebframework.web.api.crud.entity.QueryParamEntity; import org.hswebframework.web.crud.query.QueryHelper; -import org.jetlinks.core.metadata.EventMetadata; +import org.jetlinks.community.timescaledb.metadata.TimescaleDBPropertiesFeature; import org.jetlinks.core.things.ThingsRegistry; import org.jetlinks.community.things.data.AggregationRequest; import org.jetlinks.community.things.data.PropertyAggregation; @@ -49,8 +49,6 @@ import reactor.core.publisher.Mono; import java.util.*; import java.util.function.Function; -import static org.jetlinks.community.timescaledb.thing.TimescaleDBColumnModeQueryOperations.doAggregation0; - @Slf4j public class TimescaleDBRowModeQueryOperations extends RowModeQueryOperationsBase { private final DatabaseOperator database; @@ -117,7 +115,11 @@ public class TimescaleDBRowModeQueryOperations extends RowModeQueryOperationsBas if (request.getInterval() != null) { NativeSelectColumn column = TimescaleDBUtils.createTimeGroupColumn( request.getFrom().getTime(), - request.getInterval() + request.getInterval(), + database.getMetadata() + .getFeatureNow(TimescaleDBPropertiesFeature.ID) + .getProperties() + .getFunctionSchema() ); query.groupBy(column); diff --git a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/timeseries/TimescaleDBTimeSeriesManager.java b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/timeseries/TimescaleDBTimeSeriesManager.java index 7573be60..792d51c2 100644 --- a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/timeseries/TimescaleDBTimeSeriesManager.java +++ b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/timeseries/TimescaleDBTimeSeriesManager.java @@ -20,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.hswebframework.ezorm.rdb.codec.DateTimeCodec; import org.hswebframework.ezorm.rdb.metadata.RDBIndexMetadata; import org.hswebframework.ezorm.rdb.operator.ddl.TableBuilder; +import org.jetlinks.community.timescaledb.TimescaleDBProperties; import org.jetlinks.core.metadata.PropertyMetadata; import org.jetlinks.community.Interval; import org.jetlinks.community.things.data.ThingsDataConstants; @@ -48,6 +49,8 @@ public class TimescaleDBTimeSeriesManager implements TimeSeriesManager { private final TimescaleDBOperations operations; + private final TimescaleDBProperties timescaleDBProperties; + @Override public TimeSeriesService getService(TimeSeriesMetric metric) { return getService(metric.getId()); diff --git a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/timeseries/TimescaleDBTimeSeriesService.java b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/timeseries/TimescaleDBTimeSeriesService.java index 7f4d7a42..9995de8d 100644 --- a/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/timeseries/TimescaleDBTimeSeriesService.java +++ b/jetlinks-components/timescaledb-component/src/main/java/org/jetlinks/community/timescaledb/timeseries/TimescaleDBTimeSeriesService.java @@ -24,6 +24,7 @@ import org.hswebframework.ezorm.rdb.operator.dml.query.NativeSelectColumn; import org.hswebframework.ezorm.rdb.operator.dml.query.SelectColumn; import org.hswebframework.web.bean.FastBeanCopier; import org.hswebframework.web.id.IDGenerator; +import org.jetlinks.community.timescaledb.metadata.TimescaleDBPropertiesFeature; import org.jetlinks.core.utils.Reactors; import org.jetlinks.community.things.data.ThingsDataConstants; import org.jetlinks.community.timescaledb.TimescaleDBOperations; @@ -113,7 +114,14 @@ public class TimescaleDBTimeSeriesService implements TimeSeriesService { for (Group group : groups) { if (group instanceof TimeGroup) { _timeGroup = ((TimeGroup) group); - NativeSelectColumn column = TimescaleDBUtils.createTimeGroupColumn(startWith, _timeGroup.getInterval()); + NativeSelectColumn column = TimescaleDBUtils.createTimeGroupColumn(startWith, + _timeGroup.getInterval(), + operations + .database() + .getMetadata() + .getFeatureNow(TimescaleDBPropertiesFeature.ID) + .getProperties() + .getFunctionSchema()); column.setColumn(ThingsDataConstants.COLUMN_TIMESTAMP); column.setAlias(group.getAlias()); query.select(column); diff --git a/jetlinks-standalone/src/main/resources/application-default.yml b/jetlinks-standalone/src/main/resources/application-default.yml index 21bf4701..71a3e80e 100644 --- a/jetlinks-standalone/src/main/resources/application-default.yml +++ b/jetlinks-standalone/src/main/resources/application-default.yml @@ -16,6 +16,7 @@ REDIS_DATABASE: 0 # redis 数据库索引 # timescalbedb相关配置 TIMESCALEDB_SCHEMA: public # timescaledb 数据库schema,默认public +TIMESCALEDB_FUNCTION_SCHEMA: public # 时序数据所用到相关函数所在的schema # elasticsearch相关配置 diff --git a/jetlinks-standalone/src/main/resources/application.yml b/jetlinks-standalone/src/main/resources/application.yml index 1344ff7e..929aa144 100644 --- a/jetlinks-standalone/src/main/resources/application.yml +++ b/jetlinks-standalone/src/main/resources/application.yml @@ -61,6 +61,7 @@ timescaledb: # username: postgres # password: p@ssw0rd schema: ${TIMESCALEDB_SCHEMA:${easyorm.default-schema}} # timescaledb的schema,默认public + function-schema: ${TIMESCALEDB_FUNCTION_SCHEMA:${timescaledb.schema}} #时序数据所用到相关函数所在的schema time-series: enabled: true retention-policies: