diff --git a/jetlinks-components/pom.xml b/jetlinks-components/pom.xml index 58739c25..a78e51fc 100644 --- a/jetlinks-components/pom.xml +++ b/jetlinks-components/pom.xml @@ -27,6 +27,7 @@ script-component protocol-component relation-component + tdengine-component jetlinks-components diff --git a/jetlinks-components/tdengine-component/docker-compose.yml b/jetlinks-components/tdengine-component/docker-compose.yml new file mode 100755 index 00000000..d560b240 --- /dev/null +++ b/jetlinks-components/tdengine-component/docker-compose.yml @@ -0,0 +1,11 @@ +version: "2" +services: + tdengine: + image: tdengine/tdengine:3.0.0.1 + ports: + - "6030-6040:6030-6040/udp" + - "6030:6030" + - "6035:6035" + - "6041:6041" + environment: + TZ: CST-8 \ No newline at end of file diff --git a/jetlinks-components/tdengine-component/pom.xml b/jetlinks-components/tdengine-component/pom.xml new file mode 100755 index 00000000..a5407e64 --- /dev/null +++ b/jetlinks-components/tdengine-component/pom.xml @@ -0,0 +1,62 @@ + + + + jetlinks-components + org.jetlinks.community + 2.0.0-SNAPSHOT + + 4.0.0 + + tdengine-component + + + + + org.jetlinks.community + timeseries-component + ${project.version} + + + + io.projectreactor.netty + reactor-netty + + + + org.apache.commons + commons-lang3 + + + + com.taosdata.jdbc + taos-jdbcdriver + 3.0.0 + + + + org.hswebframework + hsweb-easy-orm-rdb + + + + + com.zaxxer + HikariCP + + + + org.influxdb + influxdb-java + + + + org.jetlinks.community + things-component + ${project.version} + true + + + + \ No newline at end of file diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/DetectTDengineOperations.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/DetectTDengineOperations.java new file mode 100644 index 00000000..d2afa260 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/DetectTDengineOperations.java @@ -0,0 +1,24 @@ +package org.jetlinks.community.tdengine; + +import lombok.AllArgsConstructor; + +@AllArgsConstructor +public class DetectTDengineOperations implements TDengineOperations { + private final TDEngineDataWriter writer; + private final TDEngineQueryOperations query; + + @Override + public TDEngineDataWriter forWrite() { + return writer; + } + + @Override + public TDEngineQueryOperations forQuery() { + return query; + } + + @Override + public void dispose() { + writer.dispose(); + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/Point.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/Point.java new file mode 100755 index 00000000..e8626976 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/Point.java @@ -0,0 +1,56 @@ +package org.jetlinks.community.tdengine; + +import com.google.common.collect.Maps; +import lombok.*; + +import java.util.Map; + +@Getter +@Setter +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class Point { + + private String metric; + + private String table; + + private Map values = Maps.newLinkedHashMapWithExpectedSize(32); + + private Map tags = Maps.newLinkedHashMapWithExpectedSize(8); + + private long timestamp; + + public Point(String metric, String table) { + this.metric = metric; + this.table = table; + } + + public static Point of(String metric, String table) { + return new Point(metric, table); + } + + public Point timestamp(long timestamp) { + this.timestamp = timestamp; + return this; + } + + public Point tag(String metric, Object value) { + tags.put(metric, value); + return this; + } + + public Point tags(Map values) { + this.tags.putAll(values); + return this; + } + + public Point value(String metric, Object value) { + values.put(metric, value); + return this; + } + + public Point values(Map values) { + this.values.putAll(values); + return this; + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDEngineDataWriter.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDEngineDataWriter.java new file mode 100644 index 00000000..565c9080 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDEngineDataWriter.java @@ -0,0 +1,13 @@ +package org.jetlinks.community.tdengine; + +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface TDEngineDataWriter extends Disposable { + + Mono write(Point point); + + Mono write(Flux points); + +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDEngineQueryOperations.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDEngineQueryOperations.java new file mode 100644 index 00000000..26f9cb02 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDEngineQueryOperations.java @@ -0,0 +1,12 @@ +package org.jetlinks.community.tdengine; + +import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper; +import reactor.core.publisher.Flux; + +import java.util.Map; + +public interface TDEngineQueryOperations { + + Flux query(String sql, ResultWrapper wrapper); + +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDEngineUtils.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDEngineUtils.java new file mode 100644 index 00000000..4f1ca573 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDEngineUtils.java @@ -0,0 +1,49 @@ +package org.jetlinks.community.tdengine; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.reactive.function.client.ClientResponse; +import reactor.core.publisher.Mono; + +@Slf4j +public class TDEngineUtils { + + + public static Mono checkExecuteResult(ClientResponse response) { + if (response.statusCode().isError()) { + return response + .bodyToMono(String.class) + .doOnNext(str -> { + throw new TDengineException(null, str); + }) + .switchIfEmpty(Mono.error(() -> new TDengineException(null, response.statusCode().getReasonPhrase()))) + .then(Mono.empty()); + + } + return response + .bodyToMono(String.class) + .map(json -> { + JSONObject obj = JSON.parseObject(json); + checkExecuteResult(null, obj); + return obj; + }); + + + } + + public static void checkExecuteResult(String sql, JSONObject result) { + if (result.getInteger("code") != 0) { + String error = result.getString("desc"); + if (sql != null && sql.startsWith("describe") && error.contains("does not exist")) { + return; + } + if (sql != null) { + log.warn("execute tdengine sql error [{}]: [{}]", error, sql); + } + + throw new TDengineException(sql, result.getString("desc")); + } + } + +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDengineConfiguration.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDengineConfiguration.java new file mode 100755 index 00000000..83530ef9 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDengineConfiguration.java @@ -0,0 +1,29 @@ +package org.jetlinks.community.tdengine; + + +import org.jetlinks.community.tdengine.restful.RestfulTDEngineQueryOperations; +import org.jetlinks.community.tdengine.restful.SchemalessTDEngineDataWriter; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.web.reactive.function.client.WebClient; + +@AutoConfiguration +@ConditionalOnProperty(prefix = "tdengine", value = "enabled", havingValue = "true") +@EnableConfigurationProperties(TDengineProperties.class) +public class TDengineConfiguration { + + @Bean(destroyMethod = "dispose") + @ConditionalOnMissingBean(TDengineOperations.class) + public TDengineOperations tDengineOperations(TDengineProperties properties) { + WebClient client = properties.getRestful().createClient(); + SchemalessTDEngineDataWriter writer = new SchemalessTDEngineDataWriter(client, + properties.getDatabase(), + properties.getBuffer()); + + return new DetectTDengineOperations(writer, new RestfulTDEngineQueryOperations(client, properties.getDatabase())); + } + +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDengineConstants.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDengineConstants.java new file mode 100644 index 00000000..ad58fb81 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDengineConstants.java @@ -0,0 +1,14 @@ +package org.jetlinks.community.tdengine; + +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; + +public interface TDengineConstants { + + String COLUMN_IS_TAG = "tag"; + + String COLUMN_IS_TS = "ts"; + + + +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDengineException.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDengineException.java new file mode 100755 index 00000000..faf9a455 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDengineException.java @@ -0,0 +1,22 @@ +package org.jetlinks.community.tdengine; + +import lombok.Generated; +import lombok.Getter; + +@Generated +public class TDengineException extends RuntimeException { + + @Getter + private final String sql; + + @Generated + public TDengineException(String sql, String message) { + super(message); + this.sql = sql; + } + @Generated + public TDengineException(String sql, String message, Throwable cause) { + super(message, cause); + this.sql = sql; + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDengineOperations.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDengineOperations.java new file mode 100644 index 00000000..f343f7b1 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDengineOperations.java @@ -0,0 +1,11 @@ +package org.jetlinks.community.tdengine; + +import reactor.core.Disposable; + +public interface TDengineOperations extends Disposable { + + TDEngineDataWriter forWrite(); + + TDEngineQueryOperations forQuery(); + +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDengineProperties.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDengineProperties.java new file mode 100755 index 00000000..8e2950e2 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/TDengineProperties.java @@ -0,0 +1,129 @@ +package org.jetlinks.community.tdengine; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import io.netty.channel.ChannelOption; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; +import io.netty.util.internal.ThreadLocalRandom; +import lombok.Getter; +import lombok.Setter; +import org.jetlinks.community.buffer.BufferProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.http.client.reactive.ClientHttpConnector; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.http.client.reactive.ReactorResourceFactory; +import org.springframework.util.StringUtils; +import org.springframework.util.unit.DataSize; +import org.springframework.web.reactive.function.client.*; +import org.springframework.web.util.UriComponentsBuilder; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.netty.resources.ConnectionProvider; +import reactor.netty.resources.LoopResources; +import reactor.netty.tcp.TcpClient; + +import javax.sql.DataSource; +import javax.validation.constraints.NotBlank; +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +@Getter +@Setter +@ConfigurationProperties(prefix = "tdengine") +public class TDengineProperties { + + @NotBlank + private String database; + + private Connector connector = Connector.restful; + + private RestfulConnector restful = new RestfulConnector(); + + //缓冲配置 + private Buffer buffer = new Buffer(); + + enum Connector { + restful + } + + @Getter + @Setter + public static class Buffer extends BufferProperties { + private boolean enabled = true; + + public Buffer() { + setFilePath("./data/tdengine-buffer"); + setSize(3000); + } + } + + @Getter + @Setter + public static class RestfulConnector { + + private List endpoints = new ArrayList<>(Collections.singletonList(URI.create("http://localhost:6041/"))); + + private String username = "root"; + + private String password = "taosdata"; + + private int maxConnections = Runtime.getRuntime().availableProcessors() * 8; + + private Duration pendingAcquireTimeout = Duration.ofSeconds(10); + private Duration evictInBackground = Duration.ofSeconds(60); + + private Duration connectionTimeout = Duration.ofSeconds(5); + + private Duration socketTimeout = Duration.ofSeconds(5); + + private DataSize maxInMemorySize = DataSize.ofMegabytes(10); + + public URI selectURI() { + // TODO: 2021/6/2 更好的负载均衡方式 + return endpoints.get(ThreadLocalRandom.current().nextInt(endpoints.size())); + } + + public WebClient createClient() { + WebClient.Builder builder = WebClient.builder(); + URI endpoint = endpoints.get(0); + + if (endpoints.size() > 1) { + builder = builder.filter((request, next) -> { + URI target = selectURI(); + if (target.equals(endpoint)) { + return next.exchange(request); + } + URI uri = UriComponentsBuilder + .fromUri(request.url()) + .host(target.getHost()) + .port(target.getPort()) + .build() + .toUri(); + return next + .exchange(ClientRequest + .from(request) + .url(uri) + .build()); + }); + } + + return builder + .codecs(clientCodecConfigurer -> clientCodecConfigurer + .defaultCodecs() + .maxInMemorySize((int) maxInMemorySize.toBytes())) + .defaultHeaders(headers -> { + if (StringUtils.hasText(username)) { + headers.setBasicAuth(username, password); + } + }) + .baseUrl(endpoint.toString()) + .build(); + } + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineAlterTableSqlBuilder.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineAlterTableSqlBuilder.java new file mode 100644 index 00000000..dfe2e224 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineAlterTableSqlBuilder.java @@ -0,0 +1,66 @@ +package org.jetlinks.community.tdengine.metadata; + +import org.hswebframework.ezorm.rdb.executor.DefaultBatchSqlRequest; +import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.PrepareSqlFragments; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.ddl.CommonAlterTableSqlBuilder; +import org.jetlinks.community.tdengine.TDengineConstants; + +import static org.hswebframework.ezorm.rdb.operator.builder.fragments.PrepareSqlFragments.of; + +public class TDengineAlterTableSqlBuilder extends CommonAlterTableSqlBuilder { + + @Override + protected void appendAddColumnCommentSql(DefaultBatchSqlRequest batch, RDBColumnMetadata column) { + + } + + protected PrepareSqlFragments createAlterTable(RDBColumnMetadata column) { + return of() + .addSql("ALTER", "STABLE", column.getOwner().getFullName()); + } + + @Override + protected void appendAddColumnSql(DefaultBatchSqlRequest batch, RDBColumnMetadata column) { + + if (column.getProperty(TDengineConstants.COLUMN_IS_TS).isTrue()) { + return; + } + PrepareSqlFragments fragments = createAlterTable(column); + + fragments + .addSql("ADD", column.getProperty(TDengineConstants.COLUMN_IS_TAG).isTrue() ? "COLUMN" : "TAG") + .addSql(column.getName()) + .addSql(column.getDataType()); + + batch.addBatch(fragments.toRequest()); + } + + @Override + protected void appendDropColumnSql(DefaultBatchSqlRequest batch, RDBColumnMetadata drop) { + if (drop.getProperty(TDengineConstants.COLUMN_IS_TS).isTrue()) { + return; + } + PrepareSqlFragments fragments = createAlterTable(drop); + fragments.addSql("DROP",drop.getProperty(TDengineConstants.COLUMN_IS_TAG).isTrue() ? "COLUMN" : "TAG") + .addSql(drop.getName()); + + batch.addBatch(fragments.toRequest()); + } + + @Override + protected void appendAlterColumnSql(DefaultBatchSqlRequest batch, + RDBColumnMetadata oldColumn, + RDBColumnMetadata newColumn) { + if (newColumn.getProperty(TDengineConstants.COLUMN_IS_TS).isTrue()) { + return; + } + + PrepareSqlFragments fragments = createAlterTable(newColumn); + fragments.addSql("MODIFY",newColumn.getProperty(TDengineConstants.COLUMN_IS_TAG).isTrue() ? "COLUMN" : "TAG") + .addSql(newColumn.getName()) + .addSql(newColumn.getDataType()); + + batch.addBatch(fragments.toRequest()); + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineCreateTableSqlBuilder.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineCreateTableSqlBuilder.java new file mode 100644 index 00000000..1765118f --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineCreateTableSqlBuilder.java @@ -0,0 +1,61 @@ +package org.jetlinks.community.tdengine.metadata; + +import lombok.Getter; +import lombok.Setter; +import org.hswebframework.ezorm.rdb.executor.SqlRequest; +import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata; +import org.hswebframework.ezorm.rdb.metadata.RDBTableMetadata; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.PrepareSqlFragments; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.ddl.CreateTableSqlBuilder; +import org.jetlinks.community.tdengine.TDengineConstants; + +import java.util.ArrayList; +import java.util.List; + +@SuppressWarnings("all") +@Getter +@Setter +public class TDengineCreateTableSqlBuilder implements CreateTableSqlBuilder { + + @Override + public SqlRequest build(RDBTableMetadata table) { + PrepareSqlFragments sql = PrepareSqlFragments.of(); + + List columns = new ArrayList<>(table.getColumns().size()); + sql.addSql("CREATE STABLE IF NOT EXISTS", table.getFullName(), "(") + .addSql("_ts timestamp"); + + + List tags = new ArrayList<>(); + for (RDBColumnMetadata column : table.getColumns()) { + if (column.getProperty(TDengineConstants.COLUMN_IS_TS).isTrue()) { + continue; + } + if (column.getProperty(TDengineConstants.COLUMN_IS_TAG).isTrue()) { + tags.add(column); + continue; + } + sql + .addSql(",") + .addSql(column.getQuoteName()) + .addSql(column.getDataType()); + + } + sql.addSql(")"); + if(!tags.isEmpty()){ + sql.addSql("TAGS ("); + int index= 0 ; + for (RDBColumnMetadata tag : tags) { + if(index++>0){ + sql.addSql(","); + } + sql + .addSql(tag.getQuoteName()) + .addSql(tag.getDataType()); + } + sql.addSql(")"); + } + return sql.toRequest(); + } + +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineDialect.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineDialect.java new file mode 100644 index 00000000..660ab7f4 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineDialect.java @@ -0,0 +1,52 @@ +package org.jetlinks.community.tdengine.metadata; + +import org.hswebframework.ezorm.rdb.metadata.DataType; +import org.hswebframework.ezorm.rdb.metadata.dialect.DefaultDialect; + +import java.math.BigDecimal; +import java.sql.JDBCType; + +public class TDengineDialect extends DefaultDialect { + + public TDengineDialect() { + super(); + registerDataType("decimal", DataType.builder(DataType.jdbc(JDBCType.DECIMAL, BigDecimal.class), + column -> "DOUBLE")); + + registerDataType("numeric", DataType.builder(DataType.jdbc(JDBCType.NUMERIC, BigDecimal.class), + column -> "DOUBLE")); + + registerDataType("number", DataType.builder(DataType.jdbc(JDBCType.DOUBLE, BigDecimal.class), + column -> "DOUBLE")); + + addDataTypeBuilder(JDBCType.VARCHAR,column->"varchar("+column.getLength()+")"); + addDataTypeBuilder(JDBCType.TIMESTAMP,column->"timestamp"); + + + } + + @Override + public String getQuoteStart() { + return "`"; + } + + @Override + public String getQuoteEnd() { + return "`"; + } + + @Override + public boolean isColumnToUpperCase() { + return false; + } + + @Override + public String getId() { + return "tdengine"; + } + + @Override + public String getName() { + return "TDengine"; + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineMetadataParser.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineMetadataParser.java new file mode 100644 index 00000000..ddf7b27c --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineMetadataParser.java @@ -0,0 +1,90 @@ +package org.jetlinks.community.tdengine.metadata; + +import lombok.AllArgsConstructor; +import org.hswebframework.ezorm.core.meta.ObjectMetadata; +import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor; +import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers; +import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata; +import org.hswebframework.ezorm.rdb.metadata.RDBSchemaMetadata; +import org.hswebframework.ezorm.rdb.metadata.RDBTableMetadata; +import org.hswebframework.ezorm.rdb.metadata.parser.TableMetadataParser; +import org.jetlinks.community.tdengine.TDengineConstants; +import org.jetlinks.reactor.ql.utils.CastUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +@AllArgsConstructor +public class TDengineMetadataParser implements TableMetadataParser { + + private final RDBSchemaMetadata schema; + + private ReactiveSqlExecutor sql() { + return schema.findFeatureNow(ReactiveSqlExecutor.ID); + } + + @Override + public List parseAllTableName() { + throw new UnsupportedOperationException(); + } + + @Override + public Flux parseAllTableNameReactive() { + return sql() + .select("show stables", ResultWrappers.map()) + .mapNotNull(map -> (String) map.get("stable_name")); + } + + @Override + public boolean tableExists(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public Mono tableExistsReactive(String name) { + return parseAllTableNameReactive() + .hasElement(name); + } + + @Override + public Optional parseByName(String s) { + throw new UnsupportedOperationException(); + } + + @Override + public List parseAll() { + throw new UnsupportedOperationException(); + } + + @Override + public Mono parseByNameReactive(String name) { + RDBTableMetadata table = schema.newTable(name); + return sql() + .select("describe "+table.getFullName(), ResultWrappers.map()) + .doOnNext(column -> table.addColumn(convertToColumn(column))) + .then(Mono.fromSupplier(() -> table.getColumns().isEmpty() ? null : table)); + } + + private RDBColumnMetadata convertToColumn(Map columnInfo) { + String note = (String) columnInfo.getOrDefault("Note", ""); + String column = (String) columnInfo.get("Field"); + String type = (String) columnInfo.get("Type"); + int length = CastUtils.castNumber(columnInfo.get("Length")).intValue(); + + RDBColumnMetadata metadata = new RDBColumnMetadata(); + metadata.setName(column); + metadata.setProperty(TDengineConstants.COLUMN_IS_TAG, "tag".equalsIgnoreCase(note)); + metadata.setLength(length); + metadata.setType(schema.getDialect().convertDataType(type)); + return metadata; + } + + @Override + public Flux parseAllReactive() { + return parseAllTableNameReactive() + .flatMap(this::parseByNameReactive); + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineRestfulSqlExecutor.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineRestfulSqlExecutor.java new file mode 100644 index 00000000..f2003a6e --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineRestfulSqlExecutor.java @@ -0,0 +1,125 @@ +package org.jetlinks.community.tdengine.metadata; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.hswebframework.ezorm.rdb.executor.BatchSqlRequest; +import org.hswebframework.ezorm.rdb.executor.DefaultColumnWrapperContext; +import org.hswebframework.ezorm.rdb.executor.SqlRequest; +import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor; +import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper; +import org.jetlinks.community.tdengine.TDengineException; +import org.jetlinks.core.utils.Reactors; +import org.reactivestreams.Publisher; +import org.springframework.util.CollectionUtils; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +@AllArgsConstructor +public class TDengineRestfulSqlExecutor implements ReactiveSqlExecutor { + + private final WebClient client; + + @Override + public Mono update(Publisher request) { + return this + .doExecute(request) + .then(Reactors.ALWAYS_ONE); + } + + @Override + public Mono execute(Publisher request) { + return this + .doExecute(request) + .then(); + } + + @Override + public Flux select(Publisher requests, ResultWrapper wrapper) { + return this + .doExecute(requests) + .flatMap(response -> convertQueryResult(response, wrapper)); + } + + private Flux doExecute(Publisher requests) { + return Flux + .from(requests) + .expand(request -> { + if (request instanceof BatchSqlRequest) { + return Flux.fromIterable(((BatchSqlRequest) request).getBatch()); + } + return Flux.empty(); + }) + .filter(SqlRequest::isNotEmpty) + .concatMap(request -> { + String sql = request.toNativeSql(); + log.trace("Execute ==> {}", sql); + return client + .post() + .uri("/rest/sql") + .bodyValue(sql) + .exchangeToMono(response -> response + .bodyToMono(String.class) + .map(json -> { + JSONObject result = JSON.parseObject(json); + checkExecuteResult(sql, result); + return result; + })); + }); + } + + private void checkExecuteResult(String sql, JSONObject result) { + if (result.getInteger("code") != 0) { + String error = result.getString("desc"); + if (sql.startsWith("describe") && error.contains("does not exist")) { + return; + } + log.warn("execute tdengine sql error [{}]: [{}]", error, sql); + throw new TDengineException(sql, result.getString("desc")); + } + } + + protected Flux convertQueryResult(JSONObject result, ResultWrapper wrapper) { + + JSONArray head = result.getJSONArray("column_meta"); + JSONArray data = result.getJSONArray("data"); + + if (CollectionUtils.isEmpty(head) || CollectionUtils.isEmpty(data)) { + return Flux.empty(); + } + List columns = head.stream() + .map(v-> ((JSONArray) v).getString(0)) + .collect(Collectors.toList()); + + return Flux.create(sink -> { + wrapper.beforeWrap(() -> columns); + + for (Object rowo : data) { + E rowInstance = wrapper.newRowInstance(); + JSONArray row = (JSONArray) rowo; + for (int i = 0; i < columns.size(); i++) { + String property = columns.get(i); + Object value = row.get(i); + DefaultColumnWrapperContext context = new DefaultColumnWrapperContext<>(i, property, value, rowInstance); + wrapper.wrapColumn(context); + rowInstance = context.getRowInstance(); + } + if (!wrapper.completedWrapRow(rowInstance)) { + break; + } + if (rowInstance != null) { + sink.next(rowInstance); + } + } + wrapper.completedWrap(); + sink.complete(); + }); + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineSchema.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineSchema.java new file mode 100644 index 00000000..357ada04 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/metadata/TDengineSchema.java @@ -0,0 +1,15 @@ +package org.jetlinks.community.tdengine.metadata; + +import org.hswebframework.ezorm.rdb.metadata.RDBSchemaMetadata; + +public class TDengineSchema extends RDBSchemaMetadata { + + public TDengineSchema(String name) { + super(name); + addFeature(new TDengineMetadataParser(this)); + addFeature(new TDengineCreateTableSqlBuilder()); + addFeature(new TDengineAlterTableSqlBuilder()); + } + + +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/restful/RestfulTDEngineQueryOperations.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/restful/RestfulTDEngineQueryOperations.java new file mode 100644 index 00000000..7f4af0b6 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/restful/RestfulTDEngineQueryOperations.java @@ -0,0 +1,81 @@ +package org.jetlinks.community.tdengine.restful; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.hswebframework.ezorm.rdb.executor.DefaultColumnWrapperContext; +import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper; +import org.jetlinks.community.tdengine.TDEngineQueryOperations; +import org.springframework.util.CollectionUtils; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.jetlinks.community.tdengine.TDEngineUtils.checkExecuteResult; + +@AllArgsConstructor +@Slf4j +public class RestfulTDEngineQueryOperations implements TDEngineQueryOperations { + + private final WebClient client; + + private final String database; + @Override + public Flux query(String sql, ResultWrapper wrapper) { + log.trace("Execute ==> {}", sql); + return client + .post() + .uri("/rest/sql/"+database) + .bodyValue(sql) + .exchangeToFlux(response -> response + .bodyToMono(String.class) + .flatMapMany(json -> { + JSONObject result = JSON.parseObject(json); + checkExecuteResult(sql, result); + return convertQueryResult(result, wrapper); + })); + } + + protected Flux convertQueryResult(JSONObject result, ResultWrapper wrapper) { + + JSONArray head = result.getJSONArray("column_meta"); + JSONArray data = result.getJSONArray("data"); + + if (CollectionUtils.isEmpty(head) || CollectionUtils.isEmpty(data)) { + return Flux.empty(); + } + List columns = head.stream() + .map(v -> ((JSONArray) v).getString(0)) + .collect(Collectors.toList()); + + return Flux.create(sink -> { + wrapper.beforeWrap(() -> columns); + + for (Object rowo : data) { + E rowInstance = wrapper.newRowInstance(); + JSONArray row = (JSONArray) rowo; + for (int i = 0; i < columns.size(); i++) { + String property = columns.get(i); + Object value = row.get(i); + DefaultColumnWrapperContext context = new DefaultColumnWrapperContext<>(i, property, value, rowInstance); + wrapper.wrapColumn(context); + rowInstance = context.getRowInstance(); + } + if (!wrapper.completedWrapRow(rowInstance)) { + break; + } + if (rowInstance != null) { + sink.next(rowInstance); + } + } + wrapper.completedWrap(); + sink.complete(); + }); + } + + +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/restful/SchemalessTDEngineDataWriter.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/restful/SchemalessTDEngineDataWriter.java new file mode 100644 index 00000000..060c213f --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/restful/SchemalessTDEngineDataWriter.java @@ -0,0 +1,108 @@ +package org.jetlinks.community.tdengine.restful; + +import io.netty.buffer.ByteBufAllocator; +import lombok.AllArgsConstructor; +import org.jetlinks.community.tdengine.TDEngineDataWriter; +import org.jetlinks.community.tdengine.TDengineProperties; +import org.jetlinks.community.buffer.BufferSettings; +import org.jetlinks.community.buffer.PersistenceBuffer; +import org.jetlinks.community.tdengine.Point; +import org.jetlinks.community.tdengine.TDEngineUtils; +import org.jetlinks.community.utils.ErrorUtils; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientException; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@AllArgsConstructor +public class SchemalessTDEngineDataWriter implements TDEngineDataWriter, Disposable { + private final WebClient client; + + private final String database; + + private final DataBufferFactory factory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); + private final PersistenceBuffer buffer; + + public SchemalessTDEngineDataWriter(WebClient client, String database, TDengineProperties.Buffer buffer) { + this.client = client; + this.database = database; + if (buffer.isEnabled()) { + this.buffer = new PersistenceBuffer( + BufferSettings.create("tdengine-writer.queue", buffer), + null, + list -> writeNow(list).thenReturn(false)) + .name("tdengine") + .parallelism(buffer.getParallelism()) + .retryWhenError(e -> ErrorUtils.hasException(e, WebClientException.class) + || ErrorUtils.hasException(e, IOException.class)); + + this.buffer.start(); + } else { + this.buffer = null; + } + } + + @Override + public void dispose() { + if (null != buffer) { + buffer.dispose(); + } + } + + @Override + public Mono write(Point point) { + if (buffer == null) { + return writeNow(Flux.just(convertToLine(point))); + } + buffer.write(convertToLine(point)); + + return Mono.empty(); + } + + @Override + public Mono write(Flux points) { + return writeNow(points.map(this::convertToLine)); + } + + private static final byte[] newLine = "\n".getBytes(); + + private Mono writeNow(Flux lines) { + + return client + .post() + .uri(builder -> builder + .path("/influxdb/v1/write") + .queryParam("db", database) + .build()) + .body(lines + .map(str -> { + byte[] data = str.getBytes(); + return factory + .allocateBuffer(data.length + newLine.length) + .write(data) + .write(newLine); + }) + , DataBuffer.class) + .exchangeToMono(TDEngineUtils::checkExecuteResult) + .then(); + + + } + + private String convertToLine(Point point) { + return org.influxdb.dto.Point.measurement(point.getMetric()) + .tag((Map) point.getTags()) + .fields(point.getValues()) + .time(point.getTimestamp(), TimeUnit.MILLISECONDS) + .build() + .lineProtocol(); + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/term/TDengineQueryConditionBuilder.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/term/TDengineQueryConditionBuilder.java new file mode 100755 index 00000000..d31f6eb9 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/term/TDengineQueryConditionBuilder.java @@ -0,0 +1,35 @@ +package org.jetlinks.community.tdengine.term; + +import org.hswebframework.ezorm.core.param.Term; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.AbstractTermsFragmentBuilder; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments; +import org.springframework.util.CollectionUtils; + +import java.util.List; + +public class TDengineQueryConditionBuilder extends AbstractTermsFragmentBuilder { + + + public static String build(List terms) { + + if(CollectionUtils.isEmpty(terms)){ + return ""; + } + SqlFragments fragments = new TDengineQueryConditionBuilder().createTermFragments(null, terms); + + if(fragments.isEmpty()){ + return ""; + } + + return fragments.toRequest().toString(); + + } + + @Override + protected SqlFragments createTermFragments(Object parameter, Term term) { + String type = term.getTermType(); + TDengineTermType termType = TDengineTermType.valueOf(type.toLowerCase()); + + return SqlFragments.single(termType.build("`"+term.getColumn()+"`", term.getValue())); + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/term/TDengineTermType.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/term/TDengineTermType.java new file mode 100755 index 00000000..1d357ca7 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/term/TDengineTermType.java @@ -0,0 +1,138 @@ +package org.jetlinks.community.tdengine.term; + +import lombok.AllArgsConstructor; +import org.springframework.util.StringUtils; + +import java.util.*; +import java.util.stream.Collectors; + +@AllArgsConstructor +public enum TDengineTermType { + is(true, "="), + eq(true, "="), + not(true, "!="), + notnull(false, "!=") { + @Override + protected void doBuild(String column, Object value, StringJoiner sql) { + String val = String.valueOf(value); + sql.add(escapeColumn(column)) + .add(" is not null "); + } + }, + gt(true, ">"), + gte(true, ">="), + lt(true, "<"), + lte(true, "<="), + like(false, "like") { + @Override + protected void doBuild(String column, Object value, StringJoiner sql) { + String val = String.valueOf(value); + sql.add(escapeColumn(column)) + .add(" like ").add(val); + } + }, + btw(true, "btw") { + @Override + protected void doBuild(String column, Object value, StringJoiner sql) { + List values = new ArrayList<>(convertList(value)); + if (values.isEmpty()) { + return; + } + gte.build(column, values.get(0), sql); + if (values.size() >= 2) { + sql.add(" and "); + lte.build(column, values.get(1), sql); + } + + } + }, + in(false, "in") { + @Override + protected void doBuild(String column, Object value, StringJoiner sql) { + String colSql = escapeColumn(column); + + sql.add(colSql) + .add(" in ") + .add(convertList(value) + .stream() + .map(this::createValue) + .collect(Collectors.joining(" , ", "(", ")"))); + } + }, + nin(false, "nin") { + @Override + protected void doBuild(String column, Object value, StringJoiner sql) { + String colSql = escapeColumn(column); + + sql.add(colSql) + .add(" not in ") + .add(convertList(value) + .stream() + .map(this::createValue) + .collect(Collectors.joining(" , ", "(", ")"))); + } + }; + + + final boolean forNumber; + final String expr; + + public static Collection convertList(Object value) { + if (value == null) { + return Collections.emptyList(); + } + if (value instanceof String) { + value = ((String) value).split("[,]"); + } + + if (value instanceof Object[]) { + value = Arrays.asList(((Object[]) value)); + } + + if (value instanceof Collection) { + return ((Collection) value); + } + + return Collections.singletonList(value); + } + + protected String escapeValue(String value) { + return value.replace("'", "\\'"); + } + + protected String escapeColumn(String value) { + return value; + } + + protected String createValue(Object value) { + String strVal = escapeValue(value.toString()); + if (value instanceof Number || value instanceof Boolean) { + return value.toString(); + } else if (strVal.startsWith("'") && strVal.endsWith("'")) { + return strVal; + } else { + return "'" + strVal + "'"; + } + } + + protected void doBuild(String column, Object value, StringJoiner sql) { + sql.add(escapeColumn(column)) + .add(" ") + .add(expr) + .add(" ").add(createValue(value)); + } + + public String build(String column, Object value) { + StringJoiner joiner = new StringJoiner(""); + build(column, value, joiner); + return joiner.toString(); + } + + public void build(String column, Object value, StringJoiner sql) { + if (StringUtils.isEmpty(column) || value == null) { + return; + } + + doBuild(column, value, sql); + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineColumnModeDDLOperations.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineColumnModeDDLOperations.java new file mode 100644 index 00000000..6227fe63 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineColumnModeDDLOperations.java @@ -0,0 +1,37 @@ +package org.jetlinks.community.tdengine.things; + +import org.jetlinks.core.metadata.PropertyMetadata; +import org.jetlinks.community.things.data.operations.ColumnModeDDLOperationsBase; +import org.jetlinks.community.things.data.operations.DataSettings; +import org.jetlinks.community.things.data.operations.MetricBuilder; +import reactor.core.publisher.Mono; + +import java.util.List; + +class TDengineColumnModeDDLOperations extends ColumnModeDDLOperationsBase { + + private final TDengineThingDataHelper helper; + + public TDengineColumnModeDDLOperations(String thingType, + String templateId, + String thingId, + DataSettings settings, + MetricBuilder metricBuilder, + TDengineThingDataHelper helper) { + super(thingType, templateId, thingId, settings, metricBuilder); + this.helper = helper; + } + + + @Override + protected Mono register(MetricType metricType,String metric, List properties) { + helper.metadataManager.register(metric, properties); + return Mono.empty(); + } + + @Override + protected Mono reload(MetricType metricType,String metric, List properties) { + helper.metadataManager.register(metric, properties); + return Mono.empty(); + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineColumnModeQueryOperations.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineColumnModeQueryOperations.java new file mode 100644 index 00000000..c0ad6986 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineColumnModeQueryOperations.java @@ -0,0 +1,103 @@ +package org.jetlinks.community.tdengine.things; + +import org.hswebframework.ezorm.core.dsl.Query; +import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers; +import org.hswebframework.web.api.crud.entity.PagerResult; +import org.hswebframework.web.api.crud.entity.QueryParamEntity; +import org.jetlinks.core.things.ThingsRegistry; +import org.jetlinks.community.things.data.AggregationRequest; +import org.jetlinks.community.things.data.PropertyAggregation; +import org.jetlinks.community.things.data.operations.ColumnModeQueryOperationsBase; +import org.jetlinks.community.things.data.operations.DataSettings; +import org.jetlinks.community.things.data.operations.MetricBuilder; +import org.jetlinks.community.timeseries.TimeSeriesData; +import org.jetlinks.community.timeseries.query.AggregationData; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Map; +import java.util.StringJoiner; +import java.util.function.Function; + + +class TDengineColumnModeQueryOperations extends ColumnModeQueryOperationsBase { + + final TDengineThingDataHelper helper; + + public TDengineColumnModeQueryOperations(String thingType, + String thingTemplateId, + String thingId, + MetricBuilder metricBuilder, + DataSettings settings, + ThingsRegistry registry, + TDengineThingDataHelper helper) { + super(thingType, thingTemplateId, thingId, metricBuilder, settings, registry); + this.helper = helper; + } + + + @Override + protected Flux doQuery(String metric, Query query) { + return helper.doQuery(metric, query); + } + + @Override + protected Mono> doQueryPage(String metric, + Query query, + Function mapper) { + return helper.doQueryPage(metric, query, mapper); + } + + @Override + protected Flux doAggregation(String metric, + AggregationRequest request, + AggregationContext context) { + StringJoiner joiner = new StringJoiner("", "select ", ""); + joiner.add("last(`_ts`) _ts"); + + for (PropertyAggregation property : context.getProperties()) { + joiner.add(","); + joiner.add(TDengineThingDataHelper.convertAggFunction(property)) + .add("(`").add(property.getProperty()).add("`)") + .add(" `").add(property.getAlias()).add("`"); + } + + joiner + .add(" from `").add(metric).add("` ") + .add(helper.buildWhere( + metric, + request.getFilter().clone().and("_ts", "btw", Arrays.asList(request.getFrom(), request.getTo()))) + ); + + + if (request.getInterval() != null) { + joiner.add(" ") + .add(TDengineThingDataHelper.getGroupByTime(request.getInterval())); + } + joiner.add(TDengineThingDataHelper.buildOrderBy(request.getFilter())); + + String format = request.getFormat(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format); + + return helper + .operations + .forQuery() + .query(joiner.toString(), ResultWrappers.map()) + .map(map -> { + TimeSeriesData timeSeriesData = TDengineThingDataHelper.convertToTsData(map); + long ts = timeSeriesData.getTimestamp(); + Map newData = timeSeriesData.getData(); + for (PropertyAggregation property : context.getProperties()) { + newData.putIfAbsent(property.getAlias(), 0); + } + newData.put("time", formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault()))); + return AggregationData.of(newData); + }) + .take(request.getLimit()); + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineColumnModeSaveOperations.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineColumnModeSaveOperations.java new file mode 100644 index 00000000..12e914c7 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineColumnModeSaveOperations.java @@ -0,0 +1,61 @@ +package org.jetlinks.community.tdengine.things; + +import org.jetlinks.core.message.ThingMessage; +import org.jetlinks.core.things.ThingMetadata; +import org.jetlinks.core.things.ThingsRegistry; +import org.jetlinks.community.things.data.ThingsDataConstants; +import org.jetlinks.community.things.data.operations.ColumnModeSaveOperationsBase; +import org.jetlinks.community.things.data.operations.DataSettings; +import org.jetlinks.community.things.data.operations.MetricBuilder; +import org.jetlinks.community.timeseries.TimeSeriesData; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.*; + + +class TDengineColumnModeSaveOperations extends ColumnModeSaveOperationsBase { + private final TDengineThingDataHelper helper; + + public TDengineColumnModeSaveOperations(ThingsRegistry registry, + MetricBuilder metricBuilder, + DataSettings settings, + TDengineThingDataHelper helper) { + super(registry, metricBuilder, settings); + this.helper = helper; + } + + static Set IGNORE_COLUMN = new HashSet<>(Arrays.asList( + ThingsDataConstants.COLUMN_ID, + ThingsDataConstants.COLUMN_TIMESTAMP + )); + + @Override + protected String createPropertyDataId(ThingMessage message) { + return message.getMessageId(); + } + + @Override + protected Map handlePropertiesData(ThingMetadata metadata, Map properties) { + properties = super.handlePropertiesData(metadata, properties); + IGNORE_COLUMN.forEach(properties::remove); + return properties; + } + + protected boolean isTagValue(String metric, + String key, + Object value) { + return Objects.equals(metricBuilder.getThingIdProperty(), key); + } + + @Override + protected Mono doSave(String metric, TimeSeriesData data) { + + return helper.doSave(metric, data, this::isTagValue); + } + + @Override + protected Mono doSave(String metric, Flux data) { + return helper.doSave(metric, data, this::isTagValue); + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineColumnModeStrategy.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineColumnModeStrategy.java new file mode 100644 index 00000000..fe5d4b8a --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineColumnModeStrategy.java @@ -0,0 +1,65 @@ +package org.jetlinks.community.tdengine.things; + +import org.jetlinks.core.things.ThingsRegistry; +import org.jetlinks.community.things.data.AbstractThingDataRepositoryStrategy; +import org.jetlinks.community.things.data.operations.DDLOperations; +import org.jetlinks.community.things.data.operations.QueryOperations; +import org.jetlinks.community.things.data.operations.SaveOperations; + +public class TDengineColumnModeStrategy extends AbstractThingDataRepositoryStrategy { + + private final ThingsRegistry registry; + private final TDengineThingDataHelper helper; + + public TDengineColumnModeStrategy(ThingsRegistry registry, TDengineThingDataHelper helper) { + this.registry = registry; + this.helper = helper; + } + + @Override + public String getId() { + return "tdengine-column"; + } + + @Override + public String getName() { + return "TDEngine-列式存储"; + } + + @Override + public SaveOperations createOpsForSave(OperationsContext context) { + return new TDengineColumnModeSaveOperations( + registry, + context.getMetricBuilder(), + context.getSettings(), + helper); + } + + @Override + protected QueryOperations createForQuery(String thingType, String templateId, String thingId, OperationsContext context) { + return new TDengineColumnModeQueryOperations( + thingType, + templateId, + thingId, + context.getMetricBuilder(), + context.getSettings(), + registry, + helper); + } + + @Override + protected DDLOperations createForDDL(String thingType, String templateId, String thingId, OperationsContext context) { + return new TDengineColumnModeDDLOperations( + thingType, + templateId, + thingId, + context.getSettings(), + context.getMetricBuilder(), + helper); + } + + @Override + public int getOrder() { + return 10220; + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineRowModeDDLOperations.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineRowModeDDLOperations.java new file mode 100644 index 00000000..4b33914f --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineRowModeDDLOperations.java @@ -0,0 +1,37 @@ +package org.jetlinks.community.tdengine.things; + +import org.jetlinks.core.metadata.PropertyMetadata; +import org.jetlinks.community.things.data.operations.DataSettings; +import org.jetlinks.community.things.data.operations.MetricBuilder; +import org.jetlinks.community.things.data.operations.RowModeDDLOperationsBase; +import reactor.core.publisher.Mono; + +import java.util.List; + +class TDengineRowModeDDLOperations extends RowModeDDLOperationsBase { + + private final TDengineThingDataHelper helper; + + public TDengineRowModeDDLOperations(String thingType, + String templateId, + String thingId, + DataSettings settings, + MetricBuilder metricBuilder, + TDengineThingDataHelper helper) { + super(thingType, templateId, thingId, settings, metricBuilder); + this.helper = helper; + } + + + @Override + protected Mono register(MetricType metricType,String metric, List properties) { + helper.metadataManager.register(metric, properties); + return Mono.empty(); + } + + @Override + protected Mono reload(MetricType metricType,String metric, List properties) { + helper.metadataManager.register(metric, properties); + return Mono.empty(); + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineRowModeQueryOperations.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineRowModeQueryOperations.java new file mode 100644 index 00000000..1d64fe34 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineRowModeQueryOperations.java @@ -0,0 +1,156 @@ +package org.jetlinks.community.tdengine.things; + +import org.hswebframework.ezorm.core.dsl.Query; +import org.hswebframework.ezorm.core.param.TermType; +import org.hswebframework.web.api.crud.entity.PagerResult; +import org.hswebframework.web.api.crud.entity.QueryParamEntity; +import org.jetlinks.core.metadata.PropertyMetadata; +import org.jetlinks.core.things.ThingMetadata; +import org.jetlinks.core.things.ThingsRegistry; +import org.jetlinks.community.things.data.AggregationRequest; +import org.jetlinks.community.things.data.PropertyAggregation; +import org.jetlinks.community.things.data.ThingPropertyDetail; +import org.jetlinks.community.things.data.ThingsDataConstants; +import org.jetlinks.community.things.data.operations.DataSettings; +import org.jetlinks.community.things.data.operations.MetricBuilder; +import org.jetlinks.community.things.data.operations.RowModeQueryOperationsBase; +import org.jetlinks.community.timeseries.TimeSeriesData; +import org.jetlinks.community.timeseries.query.Aggregation; +import org.jetlinks.community.timeseries.query.AggregationData; +import org.jetlinks.reactor.ql.utils.CastUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.function.Function; + +class TDengineRowModeQueryOperations extends RowModeQueryOperationsBase { + + final TDengineThingDataHelper helper; + + public TDengineRowModeQueryOperations(String thingType, + String thingTemplateId, + String thingId, + MetricBuilder metricBuilder, + DataSettings settings, + ThingsRegistry registry, + TDengineThingDataHelper helper) { + super(thingType, thingTemplateId, thingId, metricBuilder, settings, registry); + this.helper = helper; + } + + @Override + protected Flux doQuery(String metric, Query query) { + return helper.doQuery(metric, query); + } + + @Override + protected Mono> doQueryPage(String metric, + Query query, + Function mapper) { + return helper.doQueryPage(metric, query, mapper); + } + + @Override + protected Flux queryEachProperty(@Nonnull String metric, + @Nonnull Query query, + @Nonnull ThingMetadata metadata, + @Nonnull Map properties) { + return super.queryEachProperty(metric,query,metadata,properties); + } + + @Override + protected Flux doAggregation(String metric, + AggregationRequest request, + AggregationContext context) { + PropertyAggregation[] properties = context.getProperties(); + + + //聚合 + StringJoiner agg = new StringJoiner(""); + agg.add("property,last(`_ts`) _ts"); + + for (PropertyAggregation property : properties) { + agg.add(","); + agg.add(TDengineThingDataHelper.convertAggFunction(property)); + if(property.getAgg()== Aggregation.COUNT){ + agg .add("(`value`)"); + }else { + agg .add("(`numberValue`)"); + } + agg.add(" `").add("value_" + property.getAlias()).add("`"); + } + + String sql = String.join( + "", + "`", metric, "` ", + helper.buildWhere(metric, + request + .getFilter() + .clone() + .and("property", TermType.in, context.getPropertyAlias().values()) + .and("_ts", TermType.btw, Arrays.asList(request.getFrom(), request.getTo())) + ) + ); + String dataSql = "select " + agg + " from " + sql + " partition by property"; + if (request.getInterval() != null) { + dataSql += " "; + dataSql += TDengineThingDataHelper.getGroupByTime(request.getInterval()); + } + String format = request.getFormat(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format); + + if (properties.length == 1) { + String key = "value_" + properties[0].getAlias(); + return helper + .query(dataSql) + .sort(Comparator.comparing(TimeSeriesData::getTimestamp).reversed()) + .map(timeSeriesData -> { + long ts = timeSeriesData.getTimestamp(); + Map newData = new HashMap<>(); + newData.put("time", formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId + .systemDefault()))); + newData.put(properties[0].getAlias(), timeSeriesData.get(key).orElse(0)); + + return AggregationData.of(newData); + }) + .take(request.getLimit()) + ; + } + return helper + .query(dataSql) + .map(timeSeriesData -> { + long ts = timeSeriesData.getTimestamp(); + Map newData = timeSeriesData.getData(); + newData.put("time", formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault()))); + newData.put("_time", ts); + return newData; + }) + .groupBy(data -> (String) data.get("time"), Integer.MAX_VALUE) + .flatMap(group -> group + .reduceWith(HashMap::new, (a, b) -> { + a.putAll(b); + return a; + }) + .map(map -> { + Map newResult = new HashMap<>(); + for (PropertyAggregation property : properties) { + String key = "value_" + property.getAlias(); + newResult.put(property.getAlias(), Optional.ofNullable(map.get(key)).orElse(0)); + } + newResult.put("time", group.key()); + newResult.put("_time", map.getOrDefault("_time", new Date())); + return AggregationData.of(newResult); + })) + .sort(Comparator + .comparing(data -> CastUtils.castDate(data.values().get("_time"))) + .reversed()) + .doOnNext(data -> data.values().remove("_time")) + .take(request.getLimit()); + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineRowModeSaveOperations.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineRowModeSaveOperations.java new file mode 100644 index 00000000..fe8f774b --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineRowModeSaveOperations.java @@ -0,0 +1,69 @@ +package org.jetlinks.community.tdengine.things; + +import org.jetlinks.core.message.ThingMessage; +import org.jetlinks.core.metadata.PropertyMetadata; +import org.jetlinks.core.things.ThingsRegistry; +import org.jetlinks.community.things.data.ThingsDataConstants; +import org.jetlinks.community.things.data.operations.DataSettings; +import org.jetlinks.community.things.data.operations.MetricBuilder; +import org.jetlinks.community.things.data.operations.RowModeSaveOperationsBase; +import org.jetlinks.community.timeseries.TimeSeriesData; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.*; + + +class TDengineRowModeSaveOperations extends RowModeSaveOperationsBase { + private final TDengineThingDataHelper helper; + + public TDengineRowModeSaveOperations(ThingsRegistry registry, + MetricBuilder metricBuilder, + DataSettings settings, + TDengineThingDataHelper helper) { + super(registry, metricBuilder, settings); + this.helper = helper; + } + + protected boolean isTagValue(String metric, + String key, + Object value) { + return Objects.equals(metricBuilder.getThingIdProperty(), key) + || Objects.equals(ThingsDataConstants.COLUMN_PROPERTY_ID, key); + } + static Set IGNORE_COLUMN = new HashSet<>(Arrays.asList( + ThingsDataConstants.COLUMN_ID, + ThingsDataConstants.COLUMN_PROPERTY_OBJECT_VALUE, + ThingsDataConstants.COLUMN_PROPERTY_ARRAY_VALUE, + ThingsDataConstants.COLUMN_PROPERTY_GEO_VALUE, + ThingsDataConstants.COLUMN_PROPERTY_TIME_VALUE, + ThingsDataConstants.COLUMN_TIMESTAMP + )); + + @Override + protected String createPropertyDataId(String property, ThingMessage message, long timestamp) { + return String.valueOf(timestamp); + } + + @Override + protected Map createRowPropertyData(String id, + long timestamp, + ThingMessage message, + PropertyMetadata property, + Object value) { + Map data = super.createRowPropertyData(id, timestamp, message, property, value); + IGNORE_COLUMN.forEach(data::remove); + return data; + } + + @Override + protected Mono doSave(String metric, TimeSeriesData data) { + + return helper.doSave(metric, data, this::isTagValue); + } + + @Override + protected Mono doSave(String metric, Flux data) { + return helper.doSave(metric, data, this::isTagValue); + } +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineRowModeStrategy.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineRowModeStrategy.java new file mode 100644 index 00000000..b135ad23 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineRowModeStrategy.java @@ -0,0 +1,78 @@ +package org.jetlinks.community.tdengine.things; + +import org.jetlinks.core.metadata.Feature; +import org.jetlinks.core.metadata.MetadataFeature; +import org.jetlinks.core.things.ThingsRegistry; +import org.jetlinks.community.things.data.AbstractThingDataRepositoryStrategy; +import org.jetlinks.community.things.data.operations.DDLOperations; +import org.jetlinks.community.things.data.operations.QueryOperations; +import org.jetlinks.community.things.data.operations.SaveOperations; +import reactor.core.publisher.Flux; + +public class TDengineRowModeStrategy extends AbstractThingDataRepositoryStrategy { + + private final ThingsRegistry registry; + private final TDengineThingDataHelper helper; + + public TDengineRowModeStrategy(ThingsRegistry registry, TDengineThingDataHelper helper) { + this.registry = registry; + this.helper = helper; + } + + + @Override + public String getId() { + return "tdengine-row"; + } + + @Override + public String getName() { + return "TDengine-行式存储"; + } + + @Override + public SaveOperations createOpsForSave(OperationsContext context) { + return new TDengineRowModeSaveOperations( + registry, + context.getMetricBuilder(), + context.getSettings(), + helper); + } + + @Override + protected DDLOperations createForDDL(String thingType, String templateId, String thingId, OperationsContext context) { + return new TDengineRowModeDDLOperations( + thingType, + templateId, + thingId, + context.getSettings(), + context.getMetricBuilder(), + helper); + } + + @Override + protected QueryOperations createForQuery(String thingType, String templateId, String thingId, OperationsContext context) { + return new TDengineRowModeQueryOperations( + thingType, + templateId, + thingId, + context.getMetricBuilder(), + context.getSettings(), + registry, + helper); + } + + @Override + public Flux getFeatures() { + //事件不支持新增以及修改 + return Flux.just(MetadataFeature.eventNotInsertable, + MetadataFeature.eventNotModifiable + ); + } + + @Override + public int getOrder() { + return 10210; + } + +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineThingDataConfiguration.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineThingDataConfiguration.java new file mode 100644 index 00000000..4a3edd30 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineThingDataConfiguration.java @@ -0,0 +1,42 @@ +package org.jetlinks.community.tdengine.things; + +import org.jetlinks.core.things.ThingsRegistry; +import org.jetlinks.community.tdengine.TDengineConfiguration; +import org.jetlinks.community.tdengine.TDengineOperations; +import org.jetlinks.community.things.data.DefaultMetricMetadataManager; +import org.jetlinks.community.things.data.ThingsDataRepositoryStrategy; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.context.annotation.Bean; + +@AutoConfiguration(after = TDengineConfiguration.class) +@ConditionalOnClass(ThingsDataRepositoryStrategy.class) +@ConditionalOnBean(TDengineOperations.class) +public class TDengineThingDataConfiguration { + + + @Bean(destroyMethod = "dispose") + public TDengineThingDataHelper tDengineThingDataHelper(TDengineOperations operations) { + + return new TDengineThingDataHelper( + operations, + new DefaultMetricMetadataManager() + ); + } + + @Bean + public TDengineColumnModeStrategy tDengineColumnModeStrategy(ThingsRegistry registry, + TDengineThingDataHelper operations) { + + return new TDengineColumnModeStrategy(registry, operations); + } + + @Bean + public TDengineRowModeStrategy tDengineRowModeStrategy(ThingsRegistry registry, + TDengineThingDataHelper operations) { + + return new TDengineRowModeStrategy(registry, operations); + } + +} diff --git a/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineThingDataHelper.java b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineThingDataHelper.java new file mode 100644 index 00000000..98dff050 --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/java/org/jetlinks/community/tdengine/things/TDengineThingDataHelper.java @@ -0,0 +1,274 @@ +package org.jetlinks.community.tdengine.things; + +import lombok.AllArgsConstructor; +import org.hswebframework.ezorm.core.dsl.Query; +import org.hswebframework.ezorm.core.param.Sort; +import org.hswebframework.ezorm.core.param.Term; +import org.hswebframework.ezorm.core.param.TermType; +import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers; +import org.hswebframework.utils.time.DateFormatter; +import org.hswebframework.utils.time.DefaultDateFormatter; +import org.hswebframework.web.api.crud.entity.PagerResult; +import org.hswebframework.web.api.crud.entity.QueryParamEntity; +import org.hswebframework.web.exception.BusinessException; +import org.jetlinks.community.tdengine.term.TDengineQueryConditionBuilder; +import org.jetlinks.core.metadata.Converter; +import org.jetlinks.core.metadata.DataType; +import org.jetlinks.community.Interval; +import org.jetlinks.community.tdengine.Point; +import org.jetlinks.community.tdengine.TDengineOperations; +import org.jetlinks.community.things.data.MetricMetadataManager; +import org.jetlinks.community.things.data.PropertyAggregation; +import org.jetlinks.community.timeseries.TimeSeriesData; +import org.jetlinks.community.utils.ConverterUtils; +import org.jetlinks.community.utils.ObjectMappers; +import org.jetlinks.reactor.ql.utils.CastUtils; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.function.Predicate3; + +import java.util.*; +import java.util.function.Function; +import java.util.regex.Pattern; + +@AllArgsConstructor +class TDengineThingDataHelper implements Disposable { + + static { + DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ")); + + } + + final TDengineOperations operations; + + final MetricMetadataManager metadataManager; + + //转换聚合函数 + public static String convertAggFunction(PropertyAggregation agg) { + switch (agg.getAgg()) { + case NONE: + throw new BusinessException("error.unsupported_aggregation_condition", 500, agg); + default: + return agg.getAgg().name().toLowerCase(); + } + } + + public static boolean isArrayTerm(DataType type, Term term) { + String termType = term.getTermType().toLowerCase(); + return TermType.btw.equals(termType) + || TermType.nbtw.equals(termType) + || TermType.in.equals(termType) + || TermType.nin.equals(termType); + } + + public static String getGroupByTime(Interval interval) { + return "interval(" + interval.toString() + ")"; + } + + + public static Object tryConvertList(DataType type, Term term) { + return ConverterUtils + .tryConvertToList(term.getValue(), val -> { + if (type instanceof Converter) { + return ((Converter) type).convert(val); + } + return val; + }); + } + + //预处理查询条件 + public List prepareTerms(String metric, List terms) { + + if (CollectionUtils.isEmpty(terms)) { + return terms; + } + for (Term term : terms) { + //适配时间戳字段,查询统一使用timestamp + if (("timestamp".equals(term.getColumn()) || "_ts".equals(term.getColumn())) && term.getValue() != null) { + term.setColumn("_ts"); + term.setValue(prepareTimestampValue(term.getValue(), term.getTermType())); + } else { + metadataManager + .getColumn(metric, term.getColumn()) + .ifPresent(meta -> { + DataType type = meta.getValueType(); + if (isArrayTerm(type, term)) { + term.setValue(tryConvertList(type, term)); + } else if (type instanceof Converter) { + term.setValue(((Converter) type).convert(term.getValue())); + } + }); + } + + term.setTerms(prepareTerms(metric, term.getTerms())); + } + + return terms; + } + + public static Object prepareTimestampValue(Object value, String type) { + + return ConverterUtils.tryConvertToList(value,v->{ + Date date = CastUtils.castDate(v); + return date.getTime(); + }); + } + + public static String buildOrderBy(QueryParamEntity param) { + + for (Sort sort : param.getSorts()) { + if (sort.getName().equalsIgnoreCase("timestamp")) { + return " order by `_ts` " + sort.getOrder(); + } + } + return " order by `_ts` desc"; + } + + public String buildWhere(String metric, QueryParamEntity param, String... and) { + StringJoiner joiner = new StringJoiner(" ", "where ", ""); + + String sql = TDengineQueryConditionBuilder.build(prepareTerms(metric, param.getTerms())); + + if (StringUtils.hasText(sql)) { + joiner.add(sql); + } + + if (StringUtils.hasText(sql) && and.length > 0) { + joiner.add("and"); + } + for (int i = 0; i < and.length; i++) { + if (i > 0) { + joiner.add("and"); + } + joiner.add(and[i]); + } + + return joiner.length() == 6 ? "" : joiner.toString(); + } + + public Flux query(String sql) { + return operations + .forQuery() + .query(sql, ResultWrappers.map()) + .mapNotNull(TDengineThingDataHelper::convertToTsData); + } + + protected Flux doQuery(String metric, Query query) { + QueryParamEntity param = query.getParam(); + StringJoiner joiner = new StringJoiner(""); + joiner.add("select * from") + .add(" `") + .add(metric) + .add("` ") + .add(buildWhere(metric, param)) + .add(buildOrderBy(param)); + + if (param.isPaging()) { + joiner.add(" limit ").add(String.valueOf(param.getPageSize())) + .add(" offset ") + .add(String.valueOf(param.getPageSize() * param.getPageIndex())); + } + return operations + .forQuery() + .query(joiner.toString(), ResultWrappers.map()) + .mapNotNull(TDengineThingDataHelper::convertToTsData); + } + + public static TimeSeriesData convertToTsData(Map map) { + Date ts = convertTs(map.remove("_ts")); + return TimeSeriesData.of(ts, map); + } + + protected Mono> doQueryPage(String metric, + Query query, + Function mapper) { + QueryParamEntity param = query.getParam(); + String sql = "`" + metric + "` " + buildWhere(metric, param); + String countSql = "select count(1) total from " + sql; + String dataSql = "select * from " + sql + buildOrderBy(param) + " limit " + param.getPageSize() + " offset " + param + .getPageIndex() * param.getPageSize(); + + return Mono.zip( + operations.forQuery() + .query(countSql, ResultWrappers.map()) + .singleOrEmpty() + .map(data -> CastUtils.castNumber(data.getOrDefault("total", 0)).intValue()) + .defaultIfEmpty(0), + operations.forQuery() + .query(dataSql, ResultWrappers.map()) + .mapNotNull(map -> mapper.apply(convertToTsData(map))) + .collectList(), + (total, data) -> PagerResult.of(total, data, param) + ); + } + + private static final DateTimeFormatter format = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS"); + + private static Date convertTs(Object ts) { + if (ts == null) { + throw new IllegalArgumentException(); + } + if (ts instanceof Number) { + return new Date(((Number) ts).longValue()); + } + + return DateTime.parse(String.valueOf(ts)).toDate(); + } + + + //转换值为influxDB field或者tag + public void applyValue(Point builder, + String metric, + String key, + Object value, + Predicate3 tagTest) { + if (value == null) { + return; + } + if (tagTest.test(metric, key, value)) { + builder.tag(key, String.valueOf(value)); + } else if (value instanceof Number) { + builder.value(key, ((Number) value).doubleValue()); + } else if (value instanceof Date) { + builder.value(key, ((Date) value).getTime()); + } else { + if (value instanceof String) { + builder.value(key, String.valueOf(value)); + } else { + builder.value(key, ObjectMappers.toJsonString(value)); + } + } + } + + public Point convertToPoint(String metric, TimeSeriesData data, Predicate3 tagTest) { + Point point = Point.of(metric, null) + .timestamp(data.getTimestamp()); + + for (Map.Entry entry : data.values().entrySet()) { + applyValue(point, metric, entry.getKey(), entry.getValue(), tagTest); + } + return point; + } + + public Mono doSave(String metric, TimeSeriesData data, Predicate3 tagTest) { + return operations.forWrite().write(convertToPoint(metric, data, tagTest)); + } + + + public Mono doSave(String metric, Flux dataFlux, Predicate3 tagTest) { + + return operations.forWrite().write(dataFlux.map(data -> convertToPoint(metric, data, tagTest))); + } + + + @Override + public void dispose() { + operations.dispose(); + } +} diff --git a/jetlinks-components/tdengine-component/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/jetlinks-components/tdengine-component/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..5d94e59e --- /dev/null +++ b/jetlinks-components/tdengine-component/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1,2 @@ +org.jetlinks.community.tdengine.TDengineConfiguration +org.jetlinks.community.tdengine.things.TDengineThingDataConfiguration \ No newline at end of file diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/measurement/AlarmTimeSeriesMetric.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/measurement/AlarmTimeSeriesMetric.java index aeced99d..fa176201 100755 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/measurement/AlarmTimeSeriesMetric.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/measurement/AlarmTimeSeriesMetric.java @@ -7,7 +7,7 @@ import org.jetlinks.community.timeseries.TimeSeriesMetric; * * @author bestfeng * - * @see org.jetlinks.pro.timeseries.TimeSeriesService + * @see org.jetlinks.community.timeseries.TimeSeriesService * @see TimeSeriesMetric */ public interface AlarmTimeSeriesMetric { diff --git a/jetlinks-standalone/pom.xml b/jetlinks-standalone/pom.xml index 8a6a2c7e..7c751242 100644 --- a/jetlinks-standalone/pom.xml +++ b/jetlinks-standalone/pom.xml @@ -233,7 +233,7 @@ - com.github.xiaoymin + com.github.exiaoymin knife4j-springdoc-ui 2.0.8 @@ -243,6 +243,12 @@ configure-component ${project.version} + + + org.jetlinks.community + tdengine-component + ${project.version} + diff --git a/jetlinks-standalone/src/main/resources/application.yml b/jetlinks-standalone/src/main/resources/application.yml index b9d790a8..7147effc 100644 --- a/jetlinks-standalone/src/main/resources/application.yml +++ b/jetlinks-standalone/src/main/resources/application.yml @@ -22,12 +22,12 @@ spring: pool: max-active: 1024 timeout: 20s -# database: 3 + # database: 3 # max-wait: 10s r2dbc: # 需要手动创建数据库,启动会自动创建表,修改了配置easyorm相关配置也要修改 url: r2dbc:postgresql://localhost:5432/jetlinks -# url: r2dbc:mysql://localhost:3306/jetlinks?ssl=false&serverZoneId=Asia/Shanghai # 修改了配置easyorm相关配置也要修改 + # url: r2dbc:mysql://localhost:3306/jetlinks?ssl=false&serverZoneId=Asia/Shanghai # 修改了配置easyorm相关配置也要修改 username: postgres password: jetlinks pool: @@ -47,6 +47,14 @@ spring: easyorm: default-schema: public # 数据库默认的schema dialect: postgres #数据库方言 +tdengine: + enabled: false + database: jetlinks + restful: + endpoints: + - http://localhost:6041/ + username: root + password: taosdata elasticsearch: embedded: enabled: false # 为true时使用内嵌的elasticsearch,不建议在生产环境中使用 @@ -62,7 +70,7 @@ device: message: writer: time-series: - enabled: true #写出设备消息数据到elasticsearch + enabled: true #对设备数据进行持久化 captcha: enabled: false # 开启验证码 ttl: 2m #验证码过期时间,2分钟 @@ -72,9 +80,8 @@ hsweb: configs: - path: /** allowed-headers: "*" - allowed-methods: ["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"] - allowed-origins: ["*"] -# allow-credentials: true + allowed-methods: [ "GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS" ] + allowed-origins: [ "*" ] ## 生产环境请替换为具体的域名端口如: http://xxxxx max-age: 1800 dict: enum-packages: org.jetlinks @@ -108,7 +115,7 @@ file: manager: storage-base-path: ./data/files api: - # 访问api接口的根地址 + # 访问api接口的根地址 base-path: http://127.0.0.1:${server.port} jetlinks: