增加tdengine

This commit is contained in:
zhouhao 2022-11-17 11:36:28 +08:00
parent 95ac6b7af3
commit 8ee311a9ef
37 changed files with 2150 additions and 9 deletions

View File

@ -27,6 +27,7 @@
<module>script-component</module>
<module>protocol-component</module>
<module>relation-component</module>
<module>tdengine-component</module>
</modules>
<artifactId>jetlinks-components</artifactId>

View File

@ -0,0 +1,11 @@
version: "2"
services:
tdengine:
image: tdengine/tdengine:3.0.0.1
ports:
- "6030-6040:6030-6040/udp"
- "6030:6030"
- "6035:6035"
- "6041:6041"
environment:
TZ: CST-8

View File

@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>tdengine-component</artifactId>
<dependencies>
<dependency>
<groupId>org.jetlinks.community</groupId>
<artifactId>timeseries-component</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.hswebframework</groupId>
<artifactId>hsweb-easy-orm-rdb</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.zaxxer/HikariCP -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
</dependency>
<dependency>
<groupId>org.jetlinks.community</groupId>
<artifactId>things-component</artifactId>
<version>${project.version}</version>
<optional>true</optional>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,24 @@
package org.jetlinks.community.tdengine;
import lombok.AllArgsConstructor;
@AllArgsConstructor
public class DetectTDengineOperations implements TDengineOperations {
private final TDEngineDataWriter writer;
private final TDEngineQueryOperations query;
@Override
public TDEngineDataWriter forWrite() {
return writer;
}
@Override
public TDEngineQueryOperations forQuery() {
return query;
}
@Override
public void dispose() {
writer.dispose();
}
}

View File

@ -0,0 +1,56 @@
package org.jetlinks.community.tdengine;
import com.google.common.collect.Maps;
import lombok.*;
import java.util.Map;
@Getter
@Setter
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class Point {
private String metric;
private String table;
private Map<String, Object> values = Maps.newLinkedHashMapWithExpectedSize(32);
private Map<String, Object> tags = Maps.newLinkedHashMapWithExpectedSize(8);
private long timestamp;
public Point(String metric, String table) {
this.metric = metric;
this.table = table;
}
public static Point of(String metric, String table) {
return new Point(metric, table);
}
public Point timestamp(long timestamp) {
this.timestamp = timestamp;
return this;
}
public Point tag(String metric, Object value) {
tags.put(metric, value);
return this;
}
public Point tags(Map<String, Object> values) {
this.tags.putAll(values);
return this;
}
public Point value(String metric, Object value) {
values.put(metric, value);
return this;
}
public Point values(Map<String, Object> values) {
this.values.putAll(values);
return this;
}
}

View File

@ -0,0 +1,13 @@
package org.jetlinks.community.tdengine;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface TDEngineDataWriter extends Disposable {
Mono<Void> write(Point point);
Mono<Void> write(Flux<Point> points);
}

View File

@ -0,0 +1,12 @@
package org.jetlinks.community.tdengine;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import reactor.core.publisher.Flux;
import java.util.Map;
public interface TDEngineQueryOperations {
<E> Flux<E> query(String sql, ResultWrapper<E,?> wrapper);
}

View File

@ -0,0 +1,49 @@
package org.jetlinks.community.tdengine;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.function.client.ClientResponse;
import reactor.core.publisher.Mono;
@Slf4j
public class TDEngineUtils {
public static Mono<JSONObject> checkExecuteResult(ClientResponse response) {
if (response.statusCode().isError()) {
return response
.bodyToMono(String.class)
.doOnNext(str -> {
throw new TDengineException(null, str);
})
.switchIfEmpty(Mono.error(() -> new TDengineException(null, response.statusCode().getReasonPhrase())))
.then(Mono.empty());
}
return response
.bodyToMono(String.class)
.map(json -> {
JSONObject obj = JSON.parseObject(json);
checkExecuteResult(null, obj);
return obj;
});
}
public static void checkExecuteResult(String sql, JSONObject result) {
if (result.getInteger("code") != 0) {
String error = result.getString("desc");
if (sql != null && sql.startsWith("describe") && error.contains("does not exist")) {
return;
}
if (sql != null) {
log.warn("execute tdengine sql error [{}]: [{}]", error, sql);
}
throw new TDengineException(sql, result.getString("desc"));
}
}
}

View File

@ -0,0 +1,29 @@
package org.jetlinks.community.tdengine;
import org.jetlinks.community.tdengine.restful.RestfulTDEngineQueryOperations;
import org.jetlinks.community.tdengine.restful.SchemalessTDEngineDataWriter;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.client.WebClient;
@AutoConfiguration
@ConditionalOnProperty(prefix = "tdengine", value = "enabled", havingValue = "true")
@EnableConfigurationProperties(TDengineProperties.class)
public class TDengineConfiguration {
@Bean(destroyMethod = "dispose")
@ConditionalOnMissingBean(TDengineOperations.class)
public TDengineOperations tDengineOperations(TDengineProperties properties) {
WebClient client = properties.getRestful().createClient();
SchemalessTDEngineDataWriter writer = new SchemalessTDEngineDataWriter(client,
properties.getDatabase(),
properties.getBuffer());
return new DetectTDengineOperations(writer, new RestfulTDEngineQueryOperations(client, properties.getDatabase()));
}
}

View File

@ -0,0 +1,14 @@
package org.jetlinks.community.tdengine;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
public interface TDengineConstants {
String COLUMN_IS_TAG = "tag";
String COLUMN_IS_TS = "ts";
}

View File

@ -0,0 +1,22 @@
package org.jetlinks.community.tdengine;
import lombok.Generated;
import lombok.Getter;
@Generated
public class TDengineException extends RuntimeException {
@Getter
private final String sql;
@Generated
public TDengineException(String sql, String message) {
super(message);
this.sql = sql;
}
@Generated
public TDengineException(String sql, String message, Throwable cause) {
super(message, cause);
this.sql = sql;
}
}

View File

@ -0,0 +1,11 @@
package org.jetlinks.community.tdengine;
import reactor.core.Disposable;
public interface TDengineOperations extends Disposable {
TDEngineDataWriter forWrite();
TDEngineQueryOperations forQuery();
}

View File

@ -0,0 +1,129 @@
package org.jetlinks.community.tdengine;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import io.netty.util.internal.ThreadLocalRandom;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.community.buffer.BufferProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.util.StringUtils;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.*;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;
import javax.sql.DataSource;
import javax.validation.constraints.NotBlank;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Getter
@Setter
@ConfigurationProperties(prefix = "tdengine")
public class TDengineProperties {
@NotBlank
private String database;
private Connector connector = Connector.restful;
private RestfulConnector restful = new RestfulConnector();
//缓冲配置
private Buffer buffer = new Buffer();
enum Connector {
restful
}
@Getter
@Setter
public static class Buffer extends BufferProperties {
private boolean enabled = true;
public Buffer() {
setFilePath("./data/tdengine-buffer");
setSize(3000);
}
}
@Getter
@Setter
public static class RestfulConnector {
private List<URI> endpoints = new ArrayList<>(Collections.singletonList(URI.create("http://localhost:6041/")));
private String username = "root";
private String password = "taosdata";
private int maxConnections = Runtime.getRuntime().availableProcessors() * 8;
private Duration pendingAcquireTimeout = Duration.ofSeconds(10);
private Duration evictInBackground = Duration.ofSeconds(60);
private Duration connectionTimeout = Duration.ofSeconds(5);
private Duration socketTimeout = Duration.ofSeconds(5);
private DataSize maxInMemorySize = DataSize.ofMegabytes(10);
public URI selectURI() {
// TODO: 2021/6/2 更好的负载均衡方式
return endpoints.get(ThreadLocalRandom.current().nextInt(endpoints.size()));
}
public WebClient createClient() {
WebClient.Builder builder = WebClient.builder();
URI endpoint = endpoints.get(0);
if (endpoints.size() > 1) {
builder = builder.filter((request, next) -> {
URI target = selectURI();
if (target.equals(endpoint)) {
return next.exchange(request);
}
URI uri = UriComponentsBuilder
.fromUri(request.url())
.host(target.getHost())
.port(target.getPort())
.build()
.toUri();
return next
.exchange(ClientRequest
.from(request)
.url(uri)
.build());
});
}
return builder
.codecs(clientCodecConfigurer -> clientCodecConfigurer
.defaultCodecs()
.maxInMemorySize((int) maxInMemorySize.toBytes()))
.defaultHeaders(headers -> {
if (StringUtils.hasText(username)) {
headers.setBasicAuth(username, password);
}
})
.baseUrl(endpoint.toString())
.build();
}
}
}

View File

@ -0,0 +1,66 @@
package org.jetlinks.community.tdengine.metadata;
import org.hswebframework.ezorm.rdb.executor.DefaultBatchSqlRequest;
import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.PrepareSqlFragments;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.ddl.CommonAlterTableSqlBuilder;
import org.jetlinks.community.tdengine.TDengineConstants;
import static org.hswebframework.ezorm.rdb.operator.builder.fragments.PrepareSqlFragments.of;
public class TDengineAlterTableSqlBuilder extends CommonAlterTableSqlBuilder {
@Override
protected void appendAddColumnCommentSql(DefaultBatchSqlRequest batch, RDBColumnMetadata column) {
}
protected PrepareSqlFragments createAlterTable(RDBColumnMetadata column) {
return of()
.addSql("ALTER", "STABLE", column.getOwner().getFullName());
}
@Override
protected void appendAddColumnSql(DefaultBatchSqlRequest batch, RDBColumnMetadata column) {
if (column.getProperty(TDengineConstants.COLUMN_IS_TS).isTrue()) {
return;
}
PrepareSqlFragments fragments = createAlterTable(column);
fragments
.addSql("ADD", column.getProperty(TDengineConstants.COLUMN_IS_TAG).isTrue() ? "COLUMN" : "TAG")
.addSql(column.getName())
.addSql(column.getDataType());
batch.addBatch(fragments.toRequest());
}
@Override
protected void appendDropColumnSql(DefaultBatchSqlRequest batch, RDBColumnMetadata drop) {
if (drop.getProperty(TDengineConstants.COLUMN_IS_TS).isTrue()) {
return;
}
PrepareSqlFragments fragments = createAlterTable(drop);
fragments.addSql("DROP",drop.getProperty(TDengineConstants.COLUMN_IS_TAG).isTrue() ? "COLUMN" : "TAG")
.addSql(drop.getName());
batch.addBatch(fragments.toRequest());
}
@Override
protected void appendAlterColumnSql(DefaultBatchSqlRequest batch,
RDBColumnMetadata oldColumn,
RDBColumnMetadata newColumn) {
if (newColumn.getProperty(TDengineConstants.COLUMN_IS_TS).isTrue()) {
return;
}
PrepareSqlFragments fragments = createAlterTable(newColumn);
fragments.addSql("MODIFY",newColumn.getProperty(TDengineConstants.COLUMN_IS_TAG).isTrue() ? "COLUMN" : "TAG")
.addSql(newColumn.getName())
.addSql(newColumn.getDataType());
batch.addBatch(fragments.toRequest());
}
}

View File

@ -0,0 +1,61 @@
package org.jetlinks.community.tdengine.metadata;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
import org.hswebframework.ezorm.rdb.metadata.RDBTableMetadata;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.PrepareSqlFragments;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.ddl.CreateTableSqlBuilder;
import org.jetlinks.community.tdengine.TDengineConstants;
import java.util.ArrayList;
import java.util.List;
@SuppressWarnings("all")
@Getter
@Setter
public class TDengineCreateTableSqlBuilder implements CreateTableSqlBuilder {
@Override
public SqlRequest build(RDBTableMetadata table) {
PrepareSqlFragments sql = PrepareSqlFragments.of();
List<String> columns = new ArrayList<>(table.getColumns().size());
sql.addSql("CREATE STABLE IF NOT EXISTS", table.getFullName(), "(")
.addSql("_ts timestamp");
List<RDBColumnMetadata> tags = new ArrayList<>();
for (RDBColumnMetadata column : table.getColumns()) {
if (column.getProperty(TDengineConstants.COLUMN_IS_TS).isTrue()) {
continue;
}
if (column.getProperty(TDengineConstants.COLUMN_IS_TAG).isTrue()) {
tags.add(column);
continue;
}
sql
.addSql(",")
.addSql(column.getQuoteName())
.addSql(column.getDataType());
}
sql.addSql(")");
if(!tags.isEmpty()){
sql.addSql("TAGS (");
int index= 0 ;
for (RDBColumnMetadata tag : tags) {
if(index++>0){
sql.addSql(",");
}
sql
.addSql(tag.getQuoteName())
.addSql(tag.getDataType());
}
sql.addSql(")");
}
return sql.toRequest();
}
}

View File

@ -0,0 +1,52 @@
package org.jetlinks.community.tdengine.metadata;
import org.hswebframework.ezorm.rdb.metadata.DataType;
import org.hswebframework.ezorm.rdb.metadata.dialect.DefaultDialect;
import java.math.BigDecimal;
import java.sql.JDBCType;
public class TDengineDialect extends DefaultDialect {
public TDengineDialect() {
super();
registerDataType("decimal", DataType.builder(DataType.jdbc(JDBCType.DECIMAL, BigDecimal.class),
column -> "DOUBLE"));
registerDataType("numeric", DataType.builder(DataType.jdbc(JDBCType.NUMERIC, BigDecimal.class),
column -> "DOUBLE"));
registerDataType("number", DataType.builder(DataType.jdbc(JDBCType.DOUBLE, BigDecimal.class),
column -> "DOUBLE"));
addDataTypeBuilder(JDBCType.VARCHAR,column->"varchar("+column.getLength()+")");
addDataTypeBuilder(JDBCType.TIMESTAMP,column->"timestamp");
}
@Override
public String getQuoteStart() {
return "`";
}
@Override
public String getQuoteEnd() {
return "`";
}
@Override
public boolean isColumnToUpperCase() {
return false;
}
@Override
public String getId() {
return "tdengine";
}
@Override
public String getName() {
return "TDengine";
}
}

View File

@ -0,0 +1,90 @@
package org.jetlinks.community.tdengine.metadata;
import lombok.AllArgsConstructor;
import org.hswebframework.ezorm.core.meta.ObjectMetadata;
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers;
import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
import org.hswebframework.ezorm.rdb.metadata.RDBSchemaMetadata;
import org.hswebframework.ezorm.rdb.metadata.RDBTableMetadata;
import org.hswebframework.ezorm.rdb.metadata.parser.TableMetadataParser;
import org.jetlinks.community.tdengine.TDengineConstants;
import org.jetlinks.reactor.ql.utils.CastUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@AllArgsConstructor
public class TDengineMetadataParser implements TableMetadataParser {
private final RDBSchemaMetadata schema;
private ReactiveSqlExecutor sql() {
return schema.findFeatureNow(ReactiveSqlExecutor.ID);
}
@Override
public List<String> parseAllTableName() {
throw new UnsupportedOperationException();
}
@Override
public Flux<String> parseAllTableNameReactive() {
return sql()
.select("show stables", ResultWrappers.map())
.mapNotNull(map -> (String) map.get("stable_name"));
}
@Override
public boolean tableExists(String name) {
throw new UnsupportedOperationException();
}
@Override
public Mono<Boolean> tableExistsReactive(String name) {
return parseAllTableNameReactive()
.hasElement(name);
}
@Override
public Optional<? extends ObjectMetadata> parseByName(String s) {
throw new UnsupportedOperationException();
}
@Override
public List<? extends ObjectMetadata> parseAll() {
throw new UnsupportedOperationException();
}
@Override
public Mono<RDBTableMetadata> parseByNameReactive(String name) {
RDBTableMetadata table = schema.newTable(name);
return sql()
.select("describe "+table.getFullName(), ResultWrappers.map())
.doOnNext(column -> table.addColumn(convertToColumn(column)))
.then(Mono.fromSupplier(() -> table.getColumns().isEmpty() ? null : table));
}
private RDBColumnMetadata convertToColumn(Map<String, Object> columnInfo) {
String note = (String) columnInfo.getOrDefault("Note", "");
String column = (String) columnInfo.get("Field");
String type = (String) columnInfo.get("Type");
int length = CastUtils.castNumber(columnInfo.get("Length")).intValue();
RDBColumnMetadata metadata = new RDBColumnMetadata();
metadata.setName(column);
metadata.setProperty(TDengineConstants.COLUMN_IS_TAG, "tag".equalsIgnoreCase(note));
metadata.setLength(length);
metadata.setType(schema.getDialect().convertDataType(type));
return metadata;
}
@Override
public Flux<RDBTableMetadata> parseAllReactive() {
return parseAllTableNameReactive()
.flatMap(this::parseByNameReactive);
}
}

View File

@ -0,0 +1,125 @@
package org.jetlinks.community.tdengine.metadata;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.ezorm.rdb.executor.BatchSqlRequest;
import org.hswebframework.ezorm.rdb.executor.DefaultColumnWrapperContext;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.jetlinks.community.tdengine.TDengineException;
import org.jetlinks.core.utils.Reactors;
import org.reactivestreams.Publisher;
import org.springframework.util.CollectionUtils;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@AllArgsConstructor
public class TDengineRestfulSqlExecutor implements ReactiveSqlExecutor {
private final WebClient client;
@Override
public Mono<Integer> update(Publisher<SqlRequest> request) {
return this
.doExecute(request)
.then(Reactors.ALWAYS_ONE);
}
@Override
public Mono<Void> execute(Publisher<SqlRequest> request) {
return this
.doExecute(request)
.then();
}
@Override
public <E> Flux<E> select(Publisher<SqlRequest> requests, ResultWrapper<E, ?> wrapper) {
return this
.doExecute(requests)
.flatMap(response -> convertQueryResult(response, wrapper));
}
private Flux<JSONObject> doExecute(Publisher<SqlRequest> requests) {
return Flux
.from(requests)
.expand(request -> {
if (request instanceof BatchSqlRequest) {
return Flux.fromIterable(((BatchSqlRequest) request).getBatch());
}
return Flux.empty();
})
.filter(SqlRequest::isNotEmpty)
.concatMap(request -> {
String sql = request.toNativeSql();
log.trace("Execute ==> {}", sql);
return client
.post()
.uri("/rest/sql")
.bodyValue(sql)
.exchangeToMono(response -> response
.bodyToMono(String.class)
.map(json -> {
JSONObject result = JSON.parseObject(json);
checkExecuteResult(sql, result);
return result;
}));
});
}
private void checkExecuteResult(String sql, JSONObject result) {
if (result.getInteger("code") != 0) {
String error = result.getString("desc");
if (sql.startsWith("describe") && error.contains("does not exist")) {
return;
}
log.warn("execute tdengine sql error [{}]: [{}]", error, sql);
throw new TDengineException(sql, result.getString("desc"));
}
}
protected <E> Flux<E> convertQueryResult(JSONObject result, ResultWrapper<E, ?> wrapper) {
JSONArray head = result.getJSONArray("column_meta");
JSONArray data = result.getJSONArray("data");
if (CollectionUtils.isEmpty(head) || CollectionUtils.isEmpty(data)) {
return Flux.empty();
}
List<String> columns = head.stream()
.map(v-> ((JSONArray) v).getString(0))
.collect(Collectors.toList());
return Flux.create(sink -> {
wrapper.beforeWrap(() -> columns);
for (Object rowo : data) {
E rowInstance = wrapper.newRowInstance();
JSONArray row = (JSONArray) rowo;
for (int i = 0; i < columns.size(); i++) {
String property = columns.get(i);
Object value = row.get(i);
DefaultColumnWrapperContext<E> context = new DefaultColumnWrapperContext<>(i, property, value, rowInstance);
wrapper.wrapColumn(context);
rowInstance = context.getRowInstance();
}
if (!wrapper.completedWrapRow(rowInstance)) {
break;
}
if (rowInstance != null) {
sink.next(rowInstance);
}
}
wrapper.completedWrap();
sink.complete();
});
}
}

View File

@ -0,0 +1,15 @@
package org.jetlinks.community.tdengine.metadata;
import org.hswebframework.ezorm.rdb.metadata.RDBSchemaMetadata;
public class TDengineSchema extends RDBSchemaMetadata {
public TDengineSchema(String name) {
super(name);
addFeature(new TDengineMetadataParser(this));
addFeature(new TDengineCreateTableSqlBuilder());
addFeature(new TDengineAlterTableSqlBuilder());
}
}

View File

@ -0,0 +1,81 @@
package org.jetlinks.community.tdengine.restful;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.ezorm.rdb.executor.DefaultColumnWrapperContext;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.jetlinks.community.tdengine.TDEngineQueryOperations;
import org.springframework.util.CollectionUtils;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import java.util.List;
import java.util.stream.Collectors;
import static org.jetlinks.community.tdengine.TDEngineUtils.checkExecuteResult;
@AllArgsConstructor
@Slf4j
public class RestfulTDEngineQueryOperations implements TDEngineQueryOperations {
private final WebClient client;
private final String database;
@Override
public <E> Flux<E> query(String sql, ResultWrapper<E, ?> wrapper) {
log.trace("Execute ==> {}", sql);
return client
.post()
.uri("/rest/sql/"+database)
.bodyValue(sql)
.exchangeToFlux(response -> response
.bodyToMono(String.class)
.flatMapMany(json -> {
JSONObject result = JSON.parseObject(json);
checkExecuteResult(sql, result);
return convertQueryResult(result, wrapper);
}));
}
protected <E> Flux<E> convertQueryResult(JSONObject result, ResultWrapper<E, ?> wrapper) {
JSONArray head = result.getJSONArray("column_meta");
JSONArray data = result.getJSONArray("data");
if (CollectionUtils.isEmpty(head) || CollectionUtils.isEmpty(data)) {
return Flux.empty();
}
List<String> columns = head.stream()
.map(v -> ((JSONArray) v).getString(0))
.collect(Collectors.toList());
return Flux.create(sink -> {
wrapper.beforeWrap(() -> columns);
for (Object rowo : data) {
E rowInstance = wrapper.newRowInstance();
JSONArray row = (JSONArray) rowo;
for (int i = 0; i < columns.size(); i++) {
String property = columns.get(i);
Object value = row.get(i);
DefaultColumnWrapperContext<E> context = new DefaultColumnWrapperContext<>(i, property, value, rowInstance);
wrapper.wrapColumn(context);
rowInstance = context.getRowInstance();
}
if (!wrapper.completedWrapRow(rowInstance)) {
break;
}
if (rowInstance != null) {
sink.next(rowInstance);
}
}
wrapper.completedWrap();
sink.complete();
});
}
}

View File

@ -0,0 +1,108 @@
package org.jetlinks.community.tdengine.restful;
import io.netty.buffer.ByteBufAllocator;
import lombok.AllArgsConstructor;
import org.jetlinks.community.tdengine.TDEngineDataWriter;
import org.jetlinks.community.tdengine.TDengineProperties;
import org.jetlinks.community.buffer.BufferSettings;
import org.jetlinks.community.buffer.PersistenceBuffer;
import org.jetlinks.community.tdengine.Point;
import org.jetlinks.community.tdengine.TDEngineUtils;
import org.jetlinks.community.utils.ErrorUtils;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientException;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@AllArgsConstructor
public class SchemalessTDEngineDataWriter implements TDEngineDataWriter, Disposable {
private final WebClient client;
private final String database;
private final DataBufferFactory factory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
private final PersistenceBuffer<String> buffer;
public SchemalessTDEngineDataWriter(WebClient client, String database, TDengineProperties.Buffer buffer) {
this.client = client;
this.database = database;
if (buffer.isEnabled()) {
this.buffer = new PersistenceBuffer<String>(
BufferSettings.create("tdengine-writer.queue", buffer),
null,
list -> writeNow(list).thenReturn(false))
.name("tdengine")
.parallelism(buffer.getParallelism())
.retryWhenError(e -> ErrorUtils.hasException(e, WebClientException.class)
|| ErrorUtils.hasException(e, IOException.class));
this.buffer.start();
} else {
this.buffer = null;
}
}
@Override
public void dispose() {
if (null != buffer) {
buffer.dispose();
}
}
@Override
public Mono<Void> write(Point point) {
if (buffer == null) {
return writeNow(Flux.just(convertToLine(point)));
}
buffer.write(convertToLine(point));
return Mono.empty();
}
@Override
public Mono<Void> write(Flux<Point> points) {
return writeNow(points.map(this::convertToLine));
}
private static final byte[] newLine = "\n".getBytes();
private Mono<Void> writeNow(Flux<String> lines) {
return client
.post()
.uri(builder -> builder
.path("/influxdb/v1/write")
.queryParam("db", database)
.build())
.body(lines
.map(str -> {
byte[] data = str.getBytes();
return factory
.allocateBuffer(data.length + newLine.length)
.write(data)
.write(newLine);
})
, DataBuffer.class)
.exchangeToMono(TDEngineUtils::checkExecuteResult)
.then();
}
private String convertToLine(Point point) {
return org.influxdb.dto.Point.measurement(point.getMetric())
.tag((Map) point.getTags())
.fields(point.getValues())
.time(point.getTimestamp(), TimeUnit.MILLISECONDS)
.build()
.lineProtocol();
}
}

View File

@ -0,0 +1,35 @@
package org.jetlinks.community.tdengine.term;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.AbstractTermsFragmentBuilder;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.springframework.util.CollectionUtils;
import java.util.List;
public class TDengineQueryConditionBuilder extends AbstractTermsFragmentBuilder<Object> {
public static String build(List<Term> terms) {
if(CollectionUtils.isEmpty(terms)){
return "";
}
SqlFragments fragments = new TDengineQueryConditionBuilder().createTermFragments(null, terms);
if(fragments.isEmpty()){
return "";
}
return fragments.toRequest().toString();
}
@Override
protected SqlFragments createTermFragments(Object parameter, Term term) {
String type = term.getTermType();
TDengineTermType termType = TDengineTermType.valueOf(type.toLowerCase());
return SqlFragments.single(termType.build("`"+term.getColumn()+"`", term.getValue()));
}
}

View File

@ -0,0 +1,138 @@
package org.jetlinks.community.tdengine.term;
import lombok.AllArgsConstructor;
import org.springframework.util.StringUtils;
import java.util.*;
import java.util.stream.Collectors;
@AllArgsConstructor
public enum TDengineTermType {
is(true, "="),
eq(true, "="),
not(true, "!="),
notnull(false, "!=") {
@Override
protected void doBuild(String column, Object value, StringJoiner sql) {
String val = String.valueOf(value);
sql.add(escapeColumn(column))
.add(" is not null ");
}
},
gt(true, ">"),
gte(true, ">="),
lt(true, "<"),
lte(true, "<="),
like(false, "like") {
@Override
protected void doBuild(String column, Object value, StringJoiner sql) {
String val = String.valueOf(value);
sql.add(escapeColumn(column))
.add(" like ").add(val);
}
},
btw(true, "btw") {
@Override
protected void doBuild(String column, Object value, StringJoiner sql) {
List<Object> values = new ArrayList<>(convertList(value));
if (values.isEmpty()) {
return;
}
gte.build(column, values.get(0), sql);
if (values.size() >= 2) {
sql.add(" and ");
lte.build(column, values.get(1), sql);
}
}
},
in(false, "in") {
@Override
protected void doBuild(String column, Object value, StringJoiner sql) {
String colSql = escapeColumn(column);
sql.add(colSql)
.add(" in ")
.add(convertList(value)
.stream()
.map(this::createValue)
.collect(Collectors.joining(" , ", "(", ")")));
}
},
nin(false, "nin") {
@Override
protected void doBuild(String column, Object value, StringJoiner sql) {
String colSql = escapeColumn(column);
sql.add(colSql)
.add(" not in ")
.add(convertList(value)
.stream()
.map(this::createValue)
.collect(Collectors.joining(" , ", "(", ")")));
}
};
final boolean forNumber;
final String expr;
public static Collection<Object> convertList(Object value) {
if (value == null) {
return Collections.emptyList();
}
if (value instanceof String) {
value = ((String) value).split("[,]");
}
if (value instanceof Object[]) {
value = Arrays.asList(((Object[]) value));
}
if (value instanceof Collection) {
return ((Collection<Object>) value);
}
return Collections.singletonList(value);
}
protected String escapeValue(String value) {
return value.replace("'", "\\'");
}
protected String escapeColumn(String value) {
return value;
}
protected String createValue(Object value) {
String strVal = escapeValue(value.toString());
if (value instanceof Number || value instanceof Boolean) {
return value.toString();
} else if (strVal.startsWith("'") && strVal.endsWith("'")) {
return strVal;
} else {
return "'" + strVal + "'";
}
}
protected void doBuild(String column, Object value, StringJoiner sql) {
sql.add(escapeColumn(column))
.add(" ")
.add(expr)
.add(" ").add(createValue(value));
}
public String build(String column, Object value) {
StringJoiner joiner = new StringJoiner("");
build(column, value, joiner);
return joiner.toString();
}
public void build(String column, Object value, StringJoiner sql) {
if (StringUtils.isEmpty(column) || value == null) {
return;
}
doBuild(column, value, sql);
}
}

View File

@ -0,0 +1,37 @@
package org.jetlinks.community.tdengine.things;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.community.things.data.operations.ColumnModeDDLOperationsBase;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import reactor.core.publisher.Mono;
import java.util.List;
class TDengineColumnModeDDLOperations extends ColumnModeDDLOperationsBase {
private final TDengineThingDataHelper helper;
public TDengineColumnModeDDLOperations(String thingType,
String templateId,
String thingId,
DataSettings settings,
MetricBuilder metricBuilder,
TDengineThingDataHelper helper) {
super(thingType, templateId, thingId, settings, metricBuilder);
this.helper = helper;
}
@Override
protected Mono<Void> register(MetricType metricType,String metric, List<PropertyMetadata> properties) {
helper.metadataManager.register(metric, properties);
return Mono.empty();
}
@Override
protected Mono<Void> reload(MetricType metricType,String metric, List<PropertyMetadata> properties) {
helper.metadataManager.register(metric, properties);
return Mono.empty();
}
}

View File

@ -0,0 +1,103 @@
package org.jetlinks.community.tdengine.things;
import org.hswebframework.ezorm.core.dsl.Query;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.things.data.AggregationRequest;
import org.jetlinks.community.things.data.PropertyAggregation;
import org.jetlinks.community.things.data.operations.ColumnModeQueryOperationsBase;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.query.AggregationData;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Map;
import java.util.StringJoiner;
import java.util.function.Function;
class TDengineColumnModeQueryOperations extends ColumnModeQueryOperationsBase {
final TDengineThingDataHelper helper;
public TDengineColumnModeQueryOperations(String thingType,
String thingTemplateId,
String thingId,
MetricBuilder metricBuilder,
DataSettings settings,
ThingsRegistry registry,
TDengineThingDataHelper helper) {
super(thingType, thingTemplateId, thingId, metricBuilder, settings, registry);
this.helper = helper;
}
@Override
protected Flux<TimeSeriesData> doQuery(String metric, Query<?, QueryParamEntity> query) {
return helper.doQuery(metric, query);
}
@Override
protected <T> Mono<PagerResult<T>> doQueryPage(String metric,
Query<?, QueryParamEntity> query,
Function<TimeSeriesData, T> mapper) {
return helper.doQueryPage(metric, query, mapper);
}
@Override
protected Flux<AggregationData> doAggregation(String metric,
AggregationRequest request,
AggregationContext context) {
StringJoiner joiner = new StringJoiner("", "select ", "");
joiner.add("last(`_ts`) _ts");
for (PropertyAggregation property : context.getProperties()) {
joiner.add(",");
joiner.add(TDengineThingDataHelper.convertAggFunction(property))
.add("(`").add(property.getProperty()).add("`)")
.add(" `").add(property.getAlias()).add("`");
}
joiner
.add(" from `").add(metric).add("` ")
.add(helper.buildWhere(
metric,
request.getFilter().clone().and("_ts", "btw", Arrays.asList(request.getFrom(), request.getTo())))
);
if (request.getInterval() != null) {
joiner.add(" ")
.add(TDengineThingDataHelper.getGroupByTime(request.getInterval()));
}
joiner.add(TDengineThingDataHelper.buildOrderBy(request.getFilter()));
String format = request.getFormat();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
return helper
.operations
.forQuery()
.query(joiner.toString(), ResultWrappers.map())
.map(map -> {
TimeSeriesData timeSeriesData = TDengineThingDataHelper.convertToTsData(map);
long ts = timeSeriesData.getTimestamp();
Map<String, Object> newData = timeSeriesData.getData();
for (PropertyAggregation property : context.getProperties()) {
newData.putIfAbsent(property.getAlias(), 0);
}
newData.put("time", formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault())));
return AggregationData.of(newData);
})
.take(request.getLimit());
}
}

View File

@ -0,0 +1,61 @@
package org.jetlinks.community.tdengine.things;
import org.jetlinks.core.message.ThingMessage;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.things.data.ThingsDataConstants;
import org.jetlinks.community.things.data.operations.ColumnModeSaveOperationsBase;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import org.jetlinks.community.timeseries.TimeSeriesData;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.*;
class TDengineColumnModeSaveOperations extends ColumnModeSaveOperationsBase {
private final TDengineThingDataHelper helper;
public TDengineColumnModeSaveOperations(ThingsRegistry registry,
MetricBuilder metricBuilder,
DataSettings settings,
TDengineThingDataHelper helper) {
super(registry, metricBuilder, settings);
this.helper = helper;
}
static Set<String> IGNORE_COLUMN = new HashSet<>(Arrays.asList(
ThingsDataConstants.COLUMN_ID,
ThingsDataConstants.COLUMN_TIMESTAMP
));
@Override
protected String createPropertyDataId(ThingMessage message) {
return message.getMessageId();
}
@Override
protected Map<String, Object> handlePropertiesData(ThingMetadata metadata, Map<String, Object> properties) {
properties = super.handlePropertiesData(metadata, properties);
IGNORE_COLUMN.forEach(properties::remove);
return properties;
}
protected boolean isTagValue(String metric,
String key,
Object value) {
return Objects.equals(metricBuilder.getThingIdProperty(), key);
}
@Override
protected Mono<Void> doSave(String metric, TimeSeriesData data) {
return helper.doSave(metric, data, this::isTagValue);
}
@Override
protected Mono<Void> doSave(String metric, Flux<TimeSeriesData> data) {
return helper.doSave(metric, data, this::isTagValue);
}
}

View File

@ -0,0 +1,65 @@
package org.jetlinks.community.tdengine.things;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.things.data.AbstractThingDataRepositoryStrategy;
import org.jetlinks.community.things.data.operations.DDLOperations;
import org.jetlinks.community.things.data.operations.QueryOperations;
import org.jetlinks.community.things.data.operations.SaveOperations;
public class TDengineColumnModeStrategy extends AbstractThingDataRepositoryStrategy {
private final ThingsRegistry registry;
private final TDengineThingDataHelper helper;
public TDengineColumnModeStrategy(ThingsRegistry registry, TDengineThingDataHelper helper) {
this.registry = registry;
this.helper = helper;
}
@Override
public String getId() {
return "tdengine-column";
}
@Override
public String getName() {
return "TDEngine-列式存储";
}
@Override
public SaveOperations createOpsForSave(OperationsContext context) {
return new TDengineColumnModeSaveOperations(
registry,
context.getMetricBuilder(),
context.getSettings(),
helper);
}
@Override
protected QueryOperations createForQuery(String thingType, String templateId, String thingId, OperationsContext context) {
return new TDengineColumnModeQueryOperations(
thingType,
templateId,
thingId,
context.getMetricBuilder(),
context.getSettings(),
registry,
helper);
}
@Override
protected DDLOperations createForDDL(String thingType, String templateId, String thingId, OperationsContext context) {
return new TDengineColumnModeDDLOperations(
thingType,
templateId,
thingId,
context.getSettings(),
context.getMetricBuilder(),
helper);
}
@Override
public int getOrder() {
return 10220;
}
}

View File

@ -0,0 +1,37 @@
package org.jetlinks.community.tdengine.things;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import org.jetlinks.community.things.data.operations.RowModeDDLOperationsBase;
import reactor.core.publisher.Mono;
import java.util.List;
class TDengineRowModeDDLOperations extends RowModeDDLOperationsBase {
private final TDengineThingDataHelper helper;
public TDengineRowModeDDLOperations(String thingType,
String templateId,
String thingId,
DataSettings settings,
MetricBuilder metricBuilder,
TDengineThingDataHelper helper) {
super(thingType, templateId, thingId, settings, metricBuilder);
this.helper = helper;
}
@Override
protected Mono<Void> register(MetricType metricType,String metric, List<PropertyMetadata> properties) {
helper.metadataManager.register(metric, properties);
return Mono.empty();
}
@Override
protected Mono<Void> reload(MetricType metricType,String metric, List<PropertyMetadata> properties) {
helper.metadataManager.register(metric, properties);
return Mono.empty();
}
}

View File

@ -0,0 +1,156 @@
package org.jetlinks.community.tdengine.things;
import org.hswebframework.ezorm.core.dsl.Query;
import org.hswebframework.ezorm.core.param.TermType;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.things.data.AggregationRequest;
import org.jetlinks.community.things.data.PropertyAggregation;
import org.jetlinks.community.things.data.ThingPropertyDetail;
import org.jetlinks.community.things.data.ThingsDataConstants;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import org.jetlinks.community.things.data.operations.RowModeQueryOperationsBase;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.query.Aggregation;
import org.jetlinks.community.timeseries.query.AggregationData;
import org.jetlinks.reactor.ql.utils.CastUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.function.Function;
class TDengineRowModeQueryOperations extends RowModeQueryOperationsBase {
final TDengineThingDataHelper helper;
public TDengineRowModeQueryOperations(String thingType,
String thingTemplateId,
String thingId,
MetricBuilder metricBuilder,
DataSettings settings,
ThingsRegistry registry,
TDengineThingDataHelper helper) {
super(thingType, thingTemplateId, thingId, metricBuilder, settings, registry);
this.helper = helper;
}
@Override
protected Flux<TimeSeriesData> doQuery(String metric, Query<?, QueryParamEntity> query) {
return helper.doQuery(metric, query);
}
@Override
protected <T> Mono<PagerResult<T>> doQueryPage(String metric,
Query<?, QueryParamEntity> query,
Function<TimeSeriesData, T> mapper) {
return helper.doQueryPage(metric, query, mapper);
}
@Override
protected Flux<ThingPropertyDetail> queryEachProperty(@Nonnull String metric,
@Nonnull Query<?, QueryParamEntity> query,
@Nonnull ThingMetadata metadata,
@Nonnull Map<String, PropertyMetadata> properties) {
return super.queryEachProperty(metric,query,metadata,properties);
}
@Override
protected Flux<AggregationData> doAggregation(String metric,
AggregationRequest request,
AggregationContext context) {
PropertyAggregation[] properties = context.getProperties();
//聚合
StringJoiner agg = new StringJoiner("");
agg.add("property,last(`_ts`) _ts");
for (PropertyAggregation property : properties) {
agg.add(",");
agg.add(TDengineThingDataHelper.convertAggFunction(property));
if(property.getAgg()== Aggregation.COUNT){
agg .add("(`value`)");
}else {
agg .add("(`numberValue`)");
}
agg.add(" `").add("value_" + property.getAlias()).add("`");
}
String sql = String.join(
"",
"`", metric, "` ",
helper.buildWhere(metric,
request
.getFilter()
.clone()
.and("property", TermType.in, context.getPropertyAlias().values())
.and("_ts", TermType.btw, Arrays.asList(request.getFrom(), request.getTo()))
)
);
String dataSql = "select " + agg + " from " + sql + " partition by property";
if (request.getInterval() != null) {
dataSql += " ";
dataSql += TDengineThingDataHelper.getGroupByTime(request.getInterval());
}
String format = request.getFormat();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(format);
if (properties.length == 1) {
String key = "value_" + properties[0].getAlias();
return helper
.query(dataSql)
.sort(Comparator.comparing(TimeSeriesData::getTimestamp).reversed())
.map(timeSeriesData -> {
long ts = timeSeriesData.getTimestamp();
Map<String, Object> newData = new HashMap<>();
newData.put("time", formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId
.systemDefault())));
newData.put(properties[0].getAlias(), timeSeriesData.get(key).orElse(0));
return AggregationData.of(newData);
})
.take(request.getLimit())
;
}
return helper
.query(dataSql)
.map(timeSeriesData -> {
long ts = timeSeriesData.getTimestamp();
Map<String, Object> newData = timeSeriesData.getData();
newData.put("time", formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault())));
newData.put("_time", ts);
return newData;
})
.groupBy(data -> (String) data.get("time"), Integer.MAX_VALUE)
.flatMap(group -> group
.reduceWith(HashMap::new, (a, b) -> {
a.putAll(b);
return a;
})
.map(map -> {
Map<String, Object> newResult = new HashMap<>();
for (PropertyAggregation property : properties) {
String key = "value_" + property.getAlias();
newResult.put(property.getAlias(), Optional.ofNullable(map.get(key)).orElse(0));
}
newResult.put("time", group.key());
newResult.put("_time", map.getOrDefault("_time", new Date()));
return AggregationData.of(newResult);
}))
.sort(Comparator
.<AggregationData, Date>comparing(data -> CastUtils.castDate(data.values().get("_time")))
.reversed())
.doOnNext(data -> data.values().remove("_time"))
.take(request.getLimit());
}
}

View File

@ -0,0 +1,69 @@
package org.jetlinks.community.tdengine.things;
import org.jetlinks.core.message.ThingMessage;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.things.data.ThingsDataConstants;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import org.jetlinks.community.things.data.operations.RowModeSaveOperationsBase;
import org.jetlinks.community.timeseries.TimeSeriesData;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.*;
class TDengineRowModeSaveOperations extends RowModeSaveOperationsBase {
private final TDengineThingDataHelper helper;
public TDengineRowModeSaveOperations(ThingsRegistry registry,
MetricBuilder metricBuilder,
DataSettings settings,
TDengineThingDataHelper helper) {
super(registry, metricBuilder, settings);
this.helper = helper;
}
protected boolean isTagValue(String metric,
String key,
Object value) {
return Objects.equals(metricBuilder.getThingIdProperty(), key)
|| Objects.equals(ThingsDataConstants.COLUMN_PROPERTY_ID, key);
}
static Set<String> IGNORE_COLUMN = new HashSet<>(Arrays.asList(
ThingsDataConstants.COLUMN_ID,
ThingsDataConstants.COLUMN_PROPERTY_OBJECT_VALUE,
ThingsDataConstants.COLUMN_PROPERTY_ARRAY_VALUE,
ThingsDataConstants.COLUMN_PROPERTY_GEO_VALUE,
ThingsDataConstants.COLUMN_PROPERTY_TIME_VALUE,
ThingsDataConstants.COLUMN_TIMESTAMP
));
@Override
protected String createPropertyDataId(String property, ThingMessage message, long timestamp) {
return String.valueOf(timestamp);
}
@Override
protected Map<String, Object> createRowPropertyData(String id,
long timestamp,
ThingMessage message,
PropertyMetadata property,
Object value) {
Map<String, Object> data = super.createRowPropertyData(id, timestamp, message, property, value);
IGNORE_COLUMN.forEach(data::remove);
return data;
}
@Override
protected Mono<Void> doSave(String metric, TimeSeriesData data) {
return helper.doSave(metric, data, this::isTagValue);
}
@Override
protected Mono<Void> doSave(String metric, Flux<TimeSeriesData> data) {
return helper.doSave(metric, data, this::isTagValue);
}
}

View File

@ -0,0 +1,78 @@
package org.jetlinks.community.tdengine.things;
import org.jetlinks.core.metadata.Feature;
import org.jetlinks.core.metadata.MetadataFeature;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.things.data.AbstractThingDataRepositoryStrategy;
import org.jetlinks.community.things.data.operations.DDLOperations;
import org.jetlinks.community.things.data.operations.QueryOperations;
import org.jetlinks.community.things.data.operations.SaveOperations;
import reactor.core.publisher.Flux;
public class TDengineRowModeStrategy extends AbstractThingDataRepositoryStrategy {
private final ThingsRegistry registry;
private final TDengineThingDataHelper helper;
public TDengineRowModeStrategy(ThingsRegistry registry, TDengineThingDataHelper helper) {
this.registry = registry;
this.helper = helper;
}
@Override
public String getId() {
return "tdengine-row";
}
@Override
public String getName() {
return "TDengine-行式存储";
}
@Override
public SaveOperations createOpsForSave(OperationsContext context) {
return new TDengineRowModeSaveOperations(
registry,
context.getMetricBuilder(),
context.getSettings(),
helper);
}
@Override
protected DDLOperations createForDDL(String thingType, String templateId, String thingId, OperationsContext context) {
return new TDengineRowModeDDLOperations(
thingType,
templateId,
thingId,
context.getSettings(),
context.getMetricBuilder(),
helper);
}
@Override
protected QueryOperations createForQuery(String thingType, String templateId, String thingId, OperationsContext context) {
return new TDengineRowModeQueryOperations(
thingType,
templateId,
thingId,
context.getMetricBuilder(),
context.getSettings(),
registry,
helper);
}
@Override
public Flux<Feature> getFeatures() {
//事件不支持新增以及修改
return Flux.just(MetadataFeature.eventNotInsertable,
MetadataFeature.eventNotModifiable
);
}
@Override
public int getOrder() {
return 10210;
}
}

View File

@ -0,0 +1,42 @@
package org.jetlinks.community.tdengine.things;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.tdengine.TDengineConfiguration;
import org.jetlinks.community.tdengine.TDengineOperations;
import org.jetlinks.community.things.data.DefaultMetricMetadataManager;
import org.jetlinks.community.things.data.ThingsDataRepositoryStrategy;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.Bean;
@AutoConfiguration(after = TDengineConfiguration.class)
@ConditionalOnClass(ThingsDataRepositoryStrategy.class)
@ConditionalOnBean(TDengineOperations.class)
public class TDengineThingDataConfiguration {
@Bean(destroyMethod = "dispose")
public TDengineThingDataHelper tDengineThingDataHelper(TDengineOperations operations) {
return new TDengineThingDataHelper(
operations,
new DefaultMetricMetadataManager()
);
}
@Bean
public TDengineColumnModeStrategy tDengineColumnModeStrategy(ThingsRegistry registry,
TDengineThingDataHelper operations) {
return new TDengineColumnModeStrategy(registry, operations);
}
@Bean
public TDengineRowModeStrategy tDengineRowModeStrategy(ThingsRegistry registry,
TDengineThingDataHelper operations) {
return new TDengineRowModeStrategy(registry, operations);
}
}

View File

@ -0,0 +1,274 @@
package org.jetlinks.community.tdengine.things;
import lombok.AllArgsConstructor;
import org.hswebframework.ezorm.core.dsl.Query;
import org.hswebframework.ezorm.core.param.Sort;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.core.param.TermType;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers;
import org.hswebframework.utils.time.DateFormatter;
import org.hswebframework.utils.time.DefaultDateFormatter;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.hswebframework.web.exception.BusinessException;
import org.jetlinks.community.tdengine.term.TDengineQueryConditionBuilder;
import org.jetlinks.core.metadata.Converter;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.community.Interval;
import org.jetlinks.community.tdengine.Point;
import org.jetlinks.community.tdengine.TDengineOperations;
import org.jetlinks.community.things.data.MetricMetadataManager;
import org.jetlinks.community.things.data.PropertyAggregation;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.utils.ConverterUtils;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.function.Predicate3;
import java.util.*;
import java.util.function.Function;
import java.util.regex.Pattern;
@AllArgsConstructor
class TDengineThingDataHelper implements Disposable {
static {
DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
}
final TDengineOperations operations;
final MetricMetadataManager metadataManager;
//转换聚合函数
public static String convertAggFunction(PropertyAggregation agg) {
switch (agg.getAgg()) {
case NONE:
throw new BusinessException("error.unsupported_aggregation_condition", 500, agg);
default:
return agg.getAgg().name().toLowerCase();
}
}
public static boolean isArrayTerm(DataType type, Term term) {
String termType = term.getTermType().toLowerCase();
return TermType.btw.equals(termType)
|| TermType.nbtw.equals(termType)
|| TermType.in.equals(termType)
|| TermType.nin.equals(termType);
}
public static String getGroupByTime(Interval interval) {
return "interval(" + interval.toString() + ")";
}
public static Object tryConvertList(DataType type, Term term) {
return ConverterUtils
.tryConvertToList(term.getValue(), val -> {
if (type instanceof Converter) {
return ((Converter<?>) type).convert(val);
}
return val;
});
}
//预处理查询条件
public List<Term> prepareTerms(String metric, List<Term> terms) {
if (CollectionUtils.isEmpty(terms)) {
return terms;
}
for (Term term : terms) {
//适配时间戳字段,查询统一使用timestamp
if (("timestamp".equals(term.getColumn()) || "_ts".equals(term.getColumn())) && term.getValue() != null) {
term.setColumn("_ts");
term.setValue(prepareTimestampValue(term.getValue(), term.getTermType()));
} else {
metadataManager
.getColumn(metric, term.getColumn())
.ifPresent(meta -> {
DataType type = meta.getValueType();
if (isArrayTerm(type, term)) {
term.setValue(tryConvertList(type, term));
} else if (type instanceof Converter) {
term.setValue(((Converter<?>) type).convert(term.getValue()));
}
});
}
term.setTerms(prepareTerms(metric, term.getTerms()));
}
return terms;
}
public static Object prepareTimestampValue(Object value, String type) {
return ConverterUtils.tryConvertToList(value,v->{
Date date = CastUtils.castDate(v);
return date.getTime();
});
}
public static String buildOrderBy(QueryParamEntity param) {
for (Sort sort : param.getSorts()) {
if (sort.getName().equalsIgnoreCase("timestamp")) {
return " order by `_ts` " + sort.getOrder();
}
}
return " order by `_ts` desc";
}
public String buildWhere(String metric, QueryParamEntity param, String... and) {
StringJoiner joiner = new StringJoiner(" ", "where ", "");
String sql = TDengineQueryConditionBuilder.build(prepareTerms(metric, param.getTerms()));
if (StringUtils.hasText(sql)) {
joiner.add(sql);
}
if (StringUtils.hasText(sql) && and.length > 0) {
joiner.add("and");
}
for (int i = 0; i < and.length; i++) {
if (i > 0) {
joiner.add("and");
}
joiner.add(and[i]);
}
return joiner.length() == 6 ? "" : joiner.toString();
}
public Flux<TimeSeriesData> query(String sql) {
return operations
.forQuery()
.query(sql, ResultWrappers.map())
.mapNotNull(TDengineThingDataHelper::convertToTsData);
}
protected Flux<TimeSeriesData> doQuery(String metric, Query<?, QueryParamEntity> query) {
QueryParamEntity param = query.getParam();
StringJoiner joiner = new StringJoiner("");
joiner.add("select * from")
.add(" `")
.add(metric)
.add("` ")
.add(buildWhere(metric, param))
.add(buildOrderBy(param));
if (param.isPaging()) {
joiner.add(" limit ").add(String.valueOf(param.getPageSize()))
.add(" offset ")
.add(String.valueOf(param.getPageSize() * param.getPageIndex()));
}
return operations
.forQuery()
.query(joiner.toString(), ResultWrappers.map())
.mapNotNull(TDengineThingDataHelper::convertToTsData);
}
public static TimeSeriesData convertToTsData(Map<String, Object> map) {
Date ts = convertTs(map.remove("_ts"));
return TimeSeriesData.of(ts, map);
}
protected <T> Mono<PagerResult<T>> doQueryPage(String metric,
Query<?, QueryParamEntity> query,
Function<TimeSeriesData, T> mapper) {
QueryParamEntity param = query.getParam();
String sql = "`" + metric + "` " + buildWhere(metric, param);
String countSql = "select count(1) total from " + sql;
String dataSql = "select * from " + sql + buildOrderBy(param) + " limit " + param.getPageSize() + " offset " + param
.getPageIndex() * param.getPageSize();
return Mono.zip(
operations.forQuery()
.query(countSql, ResultWrappers.map())
.singleOrEmpty()
.map(data -> CastUtils.castNumber(data.getOrDefault("total", 0)).intValue())
.defaultIfEmpty(0),
operations.forQuery()
.query(dataSql, ResultWrappers.map())
.mapNotNull(map -> mapper.apply(convertToTsData(map)))
.collectList(),
(total, data) -> PagerResult.of(total, data, param)
);
}
private static final DateTimeFormatter format = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS");
private static Date convertTs(Object ts) {
if (ts == null) {
throw new IllegalArgumentException();
}
if (ts instanceof Number) {
return new Date(((Number) ts).longValue());
}
return DateTime.parse(String.valueOf(ts)).toDate();
}
//转换值为influxDB field或者tag
public void applyValue(Point builder,
String metric,
String key,
Object value,
Predicate3<String, String, Object> tagTest) {
if (value == null) {
return;
}
if (tagTest.test(metric, key, value)) {
builder.tag(key, String.valueOf(value));
} else if (value instanceof Number) {
builder.value(key, ((Number) value).doubleValue());
} else if (value instanceof Date) {
builder.value(key, ((Date) value).getTime());
} else {
if (value instanceof String) {
builder.value(key, String.valueOf(value));
} else {
builder.value(key, ObjectMappers.toJsonString(value));
}
}
}
public Point convertToPoint(String metric, TimeSeriesData data, Predicate3<String, String, Object> tagTest) {
Point point = Point.of(metric, null)
.timestamp(data.getTimestamp());
for (Map.Entry<String, Object> entry : data.values().entrySet()) {
applyValue(point, metric, entry.getKey(), entry.getValue(), tagTest);
}
return point;
}
public Mono<Void> doSave(String metric, TimeSeriesData data, Predicate3<String, String, Object> tagTest) {
return operations.forWrite().write(convertToPoint(metric, data, tagTest));
}
public Mono<Void> doSave(String metric, Flux<TimeSeriesData> dataFlux, Predicate3<String, String, Object> tagTest) {
return operations.forWrite().write(dataFlux.map(data -> convertToPoint(metric, data, tagTest)));
}
@Override
public void dispose() {
operations.dispose();
}
}

View File

@ -0,0 +1,2 @@
org.jetlinks.community.tdengine.TDengineConfiguration
org.jetlinks.community.tdengine.things.TDengineThingDataConfiguration

View File

@ -7,7 +7,7 @@ import org.jetlinks.community.timeseries.TimeSeriesMetric;
*
* @author bestfeng
*
* @see org.jetlinks.pro.timeseries.TimeSeriesService
* @see org.jetlinks.community.timeseries.TimeSeriesService
* @see TimeSeriesMetric
*/
public interface AlarmTimeSeriesMetric {

View File

@ -233,7 +233,7 @@
</dependency>
<dependency>
<groupId>com.github.xiaoymin</groupId>
<groupId>com.github.exiaoymin</groupId>
<artifactId>knife4j-springdoc-ui</artifactId>
<version>2.0.8</version>
</dependency>
@ -243,6 +243,12 @@
<artifactId>configure-component</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jetlinks.community</groupId>
<artifactId>tdengine-component</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

View File

@ -22,12 +22,12 @@ spring:
pool:
max-active: 1024
timeout: 20s
# database: 3
# database: 3
# max-wait: 10s
r2dbc:
# 需要手动创建数据库,启动会自动创建表,修改了配置easyorm相关配置也要修改
url: r2dbc:postgresql://localhost:5432/jetlinks
# url: r2dbc:mysql://localhost:3306/jetlinks?ssl=false&serverZoneId=Asia/Shanghai # 修改了配置easyorm相关配置也要修改
# url: r2dbc:mysql://localhost:3306/jetlinks?ssl=false&serverZoneId=Asia/Shanghai # 修改了配置easyorm相关配置也要修改
username: postgres
password: jetlinks
pool:
@ -47,6 +47,14 @@ spring:
easyorm:
default-schema: public # 数据库默认的schema
dialect: postgres #数据库方言
tdengine:
enabled: false
database: jetlinks
restful:
endpoints:
- http://localhost:6041/
username: root
password: taosdata
elasticsearch:
embedded:
enabled: false # 为true时使用内嵌的elasticsearch,不建议在生产环境中使用
@ -62,7 +70,7 @@ device:
message:
writer:
time-series:
enabled: true #写出设备消息数据到elasticsearch
enabled: true #对设备数据进行持久化
captcha:
enabled: false # 开启验证码
ttl: 2m #验证码过期时间,2分钟
@ -72,9 +80,8 @@ hsweb:
configs:
- path: /**
allowed-headers: "*"
allowed-methods: ["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"]
allowed-origins: ["*"]
# allow-credentials: true
allowed-methods: [ "GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS" ]
allowed-origins: [ "*" ] ## 生产环境请替换为具体的域名端口如: http://xxxxx
max-age: 1800
dict:
enum-packages: org.jetlinks
@ -108,7 +115,7 @@ file:
manager:
storage-base-path: ./data/files
api:
# 访问api接口的根地址
# 访问api接口的根地址
base-path: http://127.0.0.1:${server.port}
jetlinks: