Merge remote-tracking branch 'origin/master'

# Conflicts:
#	jetlinks-manager/authentication-manager/src/main/java/org/jetlinks/community/auth/web/OrganizationController.java
This commit is contained in:
zhouhao
2021-08-04 20:41:44 +08:00
249 changed files with 3352 additions and 1778 deletions

27
.github/workflows/pull_request.yml vendored Normal file
View File

@@ -0,0 +1,27 @@
# This workflow will build a Java project with Maven
# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven
name: Java CI with Maven
on:
pull_request:
branches: [ master ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Cache Maven Repository
uses: actions/cache@v1
with:
path: ~/.m2
key: jetlinks-community-maven-repository
- name: Build with Maven
run: ./mvnw package -Dmaven.test.skip=true -Pbuild

1
.gitignore vendored
View File

@@ -27,3 +27,4 @@ hs_err_pid*
docker/data docker/data
!device-simulator.jar !device-simulator.jar
!demo-protocol-1.0.jar !demo-protocol-1.0.jar
application-local.yml

View File

@@ -1 +1 @@
distributionUrl=http://mirrors.hust.edu.cn/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.zip distributionUrl=https://downloads.apache.org/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.zip

View File

@@ -1,8 +1,10 @@
# JetLinks 物联网基础平台 # JetLinks 物联网基础平台
![GitHub Workflow Status](https://img.shields.io/github/workflow/status/jetlinks/jetlinks-community/Auto%20Deploy%20Docker?label=docker) ![GitHub Workflow Status](https://img.shields.io/github/workflow/status/jetlinks/jetlinks-community/Auto%20Deploy%20Docker?label=docker)
![Version](https://img.shields.io/badge/Version-1.8--RELEASE-brightgreen) ![Version](https://img.shields.io/badge/version-1.9--RELEASE-brightgreen)
![QQ群2021514](https://img.shields.io/badge/QQ群-2021514-brightgreen) [![Codacy Badge](https://api.codacy.com/project/badge/Grade/e8d527d692c24633aba4f869c1c5d6ad)](https://app.codacy.com/gh/jetlinks/jetlinks-community?utm_source=github.com&utm_medium=referral&utm_content=jetlinks/jetlinks-community&utm_campaign=Badge_Grade_Settings)
[![QQ①群2021514](https://img.shields.io/badge/QQ①群-2021514-brightgreen)](https://qm.qq.com/cgi-bin/qm/qr?k=LGf0OPQqvLGdJIZST3VTcypdVWhdfAOG&jump_from=webapi)
[![QQ②群324606263](https://img.shields.io/badge/QQ②群-324606263-brightgreen)](https://qm.qq.com/cgi-bin/qm/qr?k=IMas2cH-TNsYxUcY8lRbsXqPnA2sGHYQ&jump_from=webapi)
![jetlinks](https://visitor-badge.glitch.me/badge?page_id=jetlinks) ![jetlinks](https://visitor-badge.glitch.me/badge?page_id=jetlinks)
JetLinks 基于Java8,Spring Boot 2.x,WebFlux,Netty,Vert.x,Reactor等开发, JetLinks 基于Java8,Spring Boot 2.x,WebFlux,Netty,Vert.x,Reactor等开发,
@@ -22,7 +24,8 @@ JetLinks 基于Java8,Spring Boot 2.x,WebFlux,Netty,Vert.x,Reactor等开发,
地理位置:统一管理地理位置信息,支持区域搜索. 地理位置:统一管理地理位置信息,支持区域搜索.
官方QQ: `2021514` 官方QQ: ①群 [2021514](https://qm.qq.com/cgi-bin/qm/qr?k=LGf0OPQqvLGdJIZST3VTcypdVWhdfAOG&jump_from=webapi)
, ②群 [324606263](https://qm.qq.com/cgi-bin/qm/qr?k=IMas2cH-TNsYxUcY8lRbsXqPnA2sGHYQ&jump_from=webapi)
## 技术栈 ## 技术栈

View File

@@ -1,51 +0,0 @@
version: '2'
services:
redis:
image: redis:5.0.4
container_name: jetlinks-ce-redis
# ports:
# - "6379:6379"
volumes:
- "redis-volume:/data"
command: redis-server --appendonly yes
environment:
- TZ=Asia/Shanghai
ui:
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-ui-antd:1.4.0
container_name: jetlinks-ce-ui
ports:
- 9000:80
environment:
- "API_BASE_PATH=http://jetlinks:8848/" #API根路径
volumes:
- "jetlinks-upload-volume:/usr/share/nginx/html/upload"
links:
- jetlinks:jetlinks
jetlinks:
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.4.0
container_name: jetlinks-ce
ports:
- 8848:8848 # API端口
- 1883:1883 # MQTT端口
- 8000:8000 # 预留
- 8001:8001 # 预留
- 8002:8002 # 预留
- 9000:9000 # elasticsearch
- 6379:6379 # redis
volumes:
- "jetlinks-upload-volume:/static/upload" # 持久化上传的文件
- "jetlinks-data-volume:/data"
environment:
# - "JAVA_OPTS=-Xms4g -Xmx18g -XX:+UseG1GC"
- "spring.profiles.active=dev,embedded" #使用dev和embedded环境.
- "hsweb.file.upload.static-location=http://127.0.0.1:8848/upload" #上传的静态文件访问根地址,为ui的地址.
- "logging.level.io.r2dbc=warn"
- "spring.redis.host=redis"
- "logging.level.org.springframework.data=warn"
- "logging.level.org.springframework=warn"
- "logging.level.org.jetlinks=warn"
- "logging.level.org.hswebframework=warn"
- "logging.level.org.springframework.data.r2dbc.connectionfactory=warn"
volumes:
jetlinks-upload-volume:
jetlinks-data-volume:

View File

@@ -7,7 +7,7 @@ services:
# - "6379:6379" # - "6379:6379"
volumes: volumes:
- "redis-volume:/data" - "redis-volume:/data"
command: redis-server --appendonly yes command: redis-server --appendonly yes --requirepass "JetLinks@redis"
environment: environment:
- TZ=Asia/Shanghai - TZ=Asia/Shanghai
elasticsearch: elasticsearch:
@@ -48,7 +48,7 @@ services:
POSTGRES_DB: jetlinks POSTGRES_DB: jetlinks
TZ: Asia/Shanghai TZ: Asia/Shanghai
ui: ui:
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-ui-antd:1.9.0 image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-ui-antd:1.10.0
container_name: jetlinks-ce-ui container_name: jetlinks-ce-ui
ports: ports:
- 9000:80 - 9000:80
@@ -59,26 +59,29 @@ services:
links: links:
- jetlinks:jetlinks - jetlinks:jetlinks
jetlinks: jetlinks:
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.9.0-SNAPSHOT image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.10.0
container_name: jetlinks-ce container_name: jetlinks-ce
ports: ports:
- 8848:8848 # API端口 - "8848:8848" # API端口
- 1883-1890:1883-1890 # 预留 - "1883-1890:1883-1890" # 预留
- 8000-8010:8000-8010 # 预留 - "8000-8010:8000-8010" # 预留
volumes: volumes:
- "jetlinks-volume:/application/static/upload" # 持久化上传的文件 - "jetlinks-volume:/application/static/upload" # 持久化上传的文件
- "jetlinks-protocol-volume:/application/data/protocols" - "jetlinks-protocol-volume:/application/data/protocols"
environment: environment:
# - "JAVA_OPTS=-Xms4g -Xmx18g -XX:+UseG1GC" - "JAVA_OPTS=-Duser.language=zh -XX:+UseG1GC"
- "TZ=Asia/Shanghai" - "TZ=Asia/Shanghai"
- "hsweb.file.upload.static-location=http://127.0.0.1:8848/upload" #上传的静态文件访问根地址,为ui的地址. - "hsweb.file.upload.static-location=http://127.0.0.1:8848/upload" #上传的静态文件访问根地址,为ui的地址.
- "spring.r2dbc.url=r2dbc:postgresql://postgres:5432/jetlinks" #数据库连接地址 - "spring.r2dbc.url=r2dbc:postgresql://postgres:5432/jetlinks" #数据库连接地址
- "spring.r2dbc.username=postgres" - "spring.r2dbc.username=postgres"
- "spring.r2dbc.password=jetlinks" - "spring.r2dbc.password=jetlinks"
- "elasticsearch.client.host=elasticsearch" - "spring.data.elasticsearch.client.reactive.endpoints=elasticsearch:9200"
- "elasticsearch.client.post=9200" # - "spring.data.elasticsearch.client.reactive.username=admin"
# - "spring.data.elasticsearch.client.reactive.password=admin"
# - "spring.reactor.debug-agent.enabled=false" #设置为false能提升性能
- "spring.redis.host=redis" - "spring.redis.host=redis"
- "spring.redis.port=6379" - "spring.redis.port=6379"
- "spring.redis.password=JetLinks@redis"
- "logging.level.io.r2dbc=warn" - "logging.level.io.r2dbc=warn"
- "logging.level.org.springframework.data=warn" - "logging.level.org.springframework.data=warn"
- "logging.level.org.springframework=warn" - "logging.level.org.springframework=warn"

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>jetlinks-components</artifactId> <artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId> <groupId>org.jetlinks.community</groupId>
<version>1.9.0-SNAPSHOT</version> <version>1.10.0</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
@@ -34,9 +34,15 @@
<groupId>io.micrometer</groupId> <groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId> <artifactId>micrometer-core</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.jetlinks</groupId> <groupId>org.jetlinks</groupId>
<artifactId>reactor-ql</artifactId> <artifactId>reactor-ql</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@@ -3,6 +3,9 @@ package org.jetlinks.community;
import org.jetlinks.core.config.ConfigKey; import org.jetlinks.core.config.ConfigKey;
/** /**
* 数据验证配置常量类
*
* @author zhouhao
* @see ConfigKey * @see ConfigKey
*/ */
public interface ConfigMetadataConstants { public interface ConfigMetadataConstants {

View File

@@ -3,7 +3,6 @@ package org.jetlinks.community;
import org.jetlinks.core.config.ConfigKey; import org.jetlinks.core.config.ConfigKey;
import org.jetlinks.core.message.HeaderKey; import org.jetlinks.core.message.HeaderKey;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;

View File

@@ -0,0 +1,85 @@
package org.jetlinks.community;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.reactor.ql.utils.CastUtils;
public interface PropertyMetadataConstants {
/**
* 属性来源
*/
interface Source {
//数据来源
String id = "source";
//手动写值
String manual = "manual";
//规则,虚拟属性
String rule = "rule";
/**
* 判断属性是否手动赋值
*
* @param metadata 属性物模型
* @return 是否手动赋值
*/
static boolean isManual(PropertyMetadata metadata) {
return metadata.getExpand(id)
.map(manual::equals)
.orElse(false);
}
/**
* 判断属性是否为规则
*
* @param metadata 物模型
* @return 是否规则
*/
static boolean isRule(PropertyMetadata metadata) {
return metadata
.getExpand(id)
.map(rule::equals)
.orElse(false);
}
}
/**
* 属性读写模式
*/
interface AccessMode {
String id = "accessMode";
//读
String read = "r";
//写
String write = "w";
//上报
String report = "u";
static boolean isRead(PropertyMetadata property) {
return property
.getExpand(id)
.map(val -> val.toString().contains(read))
.orElse(true);
}
static boolean isWrite(PropertyMetadata property) {
return property
.getExpand(id)
.map(val -> val.toString().contains(write))
.orElseGet(() -> property
.getExpand("readOnly")
.map(readOnly -> !CastUtils.castBoolean(readOnly))
.orElse(true)
);
}
static boolean isReport(PropertyMetadata property) {
return property
.getExpand(id)
.map(val -> val.toString().contains(report))
.orElse(true);
}
}
}

View File

@@ -8,6 +8,6 @@ public class Version {
private final String edition = "community"; private final String edition = "community";
private final String version = "1.9.0-SNAPSHOT"; private final String version = "1.9.0";
} }

View File

@@ -9,6 +9,11 @@ import org.jetlinks.community.utils.TimeUtils;
import java.util.Date; import java.util.Date;
/**
* 时间反序列化配置
*
* @author zhouhao
*/
public class SmartDateDeserializer extends JsonDeserializer<Date> { public class SmartDateDeserializer extends JsonDeserializer<Date> {
@Override @Override
@SneakyThrows @SneakyThrows

View File

@@ -7,7 +7,6 @@ import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;

View File

@@ -21,7 +21,6 @@ package org.jetlinks.community.utils;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.hswebframework.utils.time.DateFormatter;
import org.jetlinks.core.metadata.types.DateTimeType; import org.jetlinks.core.metadata.types.DateTimeType;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@@ -30,6 +29,11 @@ import java.time.temporal.TemporalAdjusters;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
/**
* 时间转换工具
*
* @author zhouhao
*/
public class DateMathParser { public class DateMathParser {

View File

@@ -5,17 +5,15 @@ import org.hswebframework.web.exception.NotFoundException;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
/** /**
* 异常处理工具
*
* @author wangzheng * @author wangzheng
* @see * @see
* @since 1.0 * @since 1.0
*/ */
public class ErrorUtils { public class ErrorUtils {
public static <T> Mono<T> notFound(String message){ public static <T> Mono<T> notFound(String message) {
return Mono.error(()->new NotFoundException(message)); return Mono.error(() -> new NotFoundException(message));
}
public static <T> Mono<T> accessDeny(String message){
return Mono.error(()->new AccessDenyException(message));
} }
} }

View File

@@ -5,6 +5,11 @@ import java.time.Duration;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.Date; import java.util.Date;
/**
* 时间工具类
*
* @author zhouhao
*/
public class TimeUtils { public class TimeUtils {

View File

@@ -0,0 +1,106 @@
package org.jetlinks.community.web;
import com.fasterxml.jackson.databind.JsonMappingException;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.ezorm.rdb.exception.DuplicateKeyException;
import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
import org.hswebframework.web.crud.web.ResponseMessage;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.core.codec.DecodingException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.stream.Collectors;
@RestControllerAdvice
@Slf4j
@Order(Ordered.HIGHEST_PRECEDENCE)
public class ErrorControllerAdvice {
@ExceptionHandler
@ResponseStatus(HttpStatus.BAD_REQUEST)
public Mono<ResponseMessage<?>> handleException(DecodingException decodingException) {
Throwable cause = decodingException.getCause();
if (cause != null) {
if (cause instanceof JsonMappingException) {
return handleException(((JsonMappingException) cause));
}
return Mono.just(ResponseMessage.error(400, "illegal_argument", cause.getMessage()));
}
return Mono.just(ResponseMessage.error(400, "illegal_argument", decodingException.getMessage()));
}
@ExceptionHandler
@ResponseStatus(HttpStatus.BAD_REQUEST)
public Mono<ResponseMessage<?>> handleException(JsonMappingException decodingException) {
Throwable cause = decodingException.getCause();
if (cause != null) {
if (cause instanceof IllegalArgumentException) {
return Mono.just(ResponseMessage.error(400, "illegal_argument", cause.getMessage()));
}
return Mono.just(ResponseMessage.error(400, "illegal_argument", cause.getMessage()));
}
return Mono.just(ResponseMessage.error(400, "illegal_argument", decodingException.getMessage()));
}
@ExceptionHandler
@ResponseStatus(HttpStatus.BAD_REQUEST)
public Mono<ResponseMessage<?>> handleException(DuplicateKeyException e) {
List<String> columns = e
.getColumns()
.stream()
.map(RDBColumnMetadata::getAlias)
.collect(Collectors.toList());
if (columns.isEmpty()) {
return LocaleUtils
.resolveMessageReactive("error.duplicate_key")
.map(msg -> ResponseMessage.error(400, "duplicate_key", msg));
}
return LocaleUtils
.resolveMessageReactive("error.duplicate_key_detail", columns)
.map(msg -> ResponseMessage.error(400, "duplicate_key", msg).result(columns));
}
@ExceptionHandler
public Mono<ResponseEntity<ResponseMessage<Object>>> handleException(DeviceOperationException e) {
//200
if (e.getCode() == ErrorCode.REQUEST_HANDLING) {
return LocaleUtils
.resolveMessageReactive("message.device_message_handing")
.map(msg -> ResponseEntity
.status(HttpStatus.OK)
.body(ResponseMessage.error(200, "request_handling", msg).result(msg))
);
}
HttpStatus _status = HttpStatus.INTERNAL_SERVER_ERROR;
if (e.getCode() == ErrorCode.FUNCTION_UNDEFINED
|| e.getCode() == ErrorCode.PARAMETER_UNDEFINED) {
//404
_status = HttpStatus.NOT_FOUND;
} else if (e.getCode() == ErrorCode.PARAMETER_UNDEFINED) {
//400
_status = HttpStatus.BAD_REQUEST;
}
HttpStatus status = _status;
return LocaleUtils
.resolveMessageReactive(e.getCode().getText())
.map(msg -> ResponseEntity
.status(status)
.body(ResponseMessage.error(status.value(), e.getCode().name().toLowerCase(), msg))
);
}
}

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>jetlinks-components</artifactId> <artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId> <groupId>org.jetlinks.community</groupId>
<version>1.9.0-SNAPSHOT</version> <version>1.10.0</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@@ -3,7 +3,6 @@ package org.jetlinks.community.dashboard;
import org.jetlinks.core.metadata.ConfigMetadata; import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.DataType;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
/** /**
* 指标维度,如: 每小时,服务器1 * 指标维度,如: 每小时,服务器1

View File

@@ -17,6 +17,7 @@ import java.util.Optional;
public class MeasurementParameter implements ValueObject { public class MeasurementParameter implements ValueObject {
private Map<String, Object> params = new HashMap<>(); private Map<String, Object> params = new HashMap<>();
@Override
public Optional<Object> get(String name) { public Optional<Object> get(String name) {
return Optional.ofNullable(params).map(p -> p.get(name)); return Optional.ofNullable(params).map(p -> p.get(name));
} }

View File

@@ -1,13 +1,13 @@
package org.jetlinks.community.dashboard.measurements; package org.jetlinks.community.dashboard.measurements;
import org.hswebframework.utils.time.DateFormatter; import org.hswebframework.utils.time.DateFormatter;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.jetlinks.core.metadata.ConfigMetadata; import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.types.DoubleType; import org.jetlinks.core.metadata.types.DoubleType;
import org.jetlinks.core.metadata.unit.UnifyUnit; import org.jetlinks.core.metadata.unit.UnifyUnit;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;

View File

@@ -3,18 +3,17 @@ package org.jetlinks.community.dashboard.measurements;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import org.hswebframework.utils.time.DateFormatter; import org.hswebframework.utils.time.DateFormatter;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.jetlinks.core.metadata.ConfigMetadata; import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.SimplePropertyMetadata; import org.jetlinks.core.metadata.SimplePropertyMetadata;
import org.jetlinks.core.metadata.types.DoubleType; import org.jetlinks.core.metadata.types.DoubleType;
import org.jetlinks.core.metadata.types.LongType; import org.jetlinks.core.metadata.types.LongType;
import org.jetlinks.core.metadata.types.ObjectType; import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean; import java.lang.management.MemoryMXBean;

View File

@@ -1,13 +1,13 @@
package org.jetlinks.community.dashboard.measurements; package org.jetlinks.community.dashboard.measurements;
import org.hswebframework.utils.time.DateFormatter; import org.hswebframework.utils.time.DateFormatter;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.jetlinks.core.metadata.ConfigMetadata; import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.types.DoubleType; import org.jetlinks.core.metadata.types.DoubleType;
import org.jetlinks.core.metadata.unit.UnifyUnit; import org.jetlinks.core.metadata.unit.UnifyUnit;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;

View File

@@ -3,20 +3,18 @@ package org.jetlinks.community.dashboard.measurements;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import org.hswebframework.utils.time.DateFormatter; import org.hswebframework.utils.time.DateFormatter;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.jetlinks.core.metadata.ConfigMetadata; import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.SimplePropertyMetadata; import org.jetlinks.core.metadata.SimplePropertyMetadata;
import org.jetlinks.core.metadata.types.DoubleType; import org.jetlinks.core.metadata.types.DoubleType;
import org.jetlinks.core.metadata.types.LongType; import org.jetlinks.core.metadata.types.LongType;
import org.jetlinks.core.metadata.types.ObjectType; import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.Duration; import java.time.Duration;
import java.util.Date; import java.util.Date;

View File

@@ -3,7 +3,6 @@ package org.jetlinks.community.dashboard.measurements;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import reactor.core.publisher.Flux;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean; import java.lang.management.OperatingSystemMXBean;

View File

@@ -5,7 +5,10 @@ import org.hswebframework.web.authorization.annotation.Authorize;
import org.hswebframework.web.authorization.annotation.QueryAction; import org.hswebframework.web.authorization.annotation.QueryAction;
import org.hswebframework.web.authorization.annotation.Resource; import org.hswebframework.web.authorization.annotation.Resource;
import org.hswebframework.web.exception.NotFoundException; import org.hswebframework.web.exception.NotFoundException;
import org.jetlinks.community.dashboard.*; import org.jetlinks.community.dashboard.DashboardManager;
import org.jetlinks.community.dashboard.DashboardObject;
import org.jetlinks.community.dashboard.MeasurementParameter;
import org.jetlinks.community.dashboard.MeasurementValue;
import org.jetlinks.community.dashboard.web.request.DashboardMeasurementRequest; import org.jetlinks.community.dashboard.web.request.DashboardMeasurementRequest;
import org.jetlinks.community.dashboard.web.response.DashboardInfo; import org.jetlinks.community.dashboard.web.response.DashboardInfo;
import org.jetlinks.community.dashboard.web.response.DashboardMeasurementResponse; import org.jetlinks.community.dashboard.web.response.DashboardMeasurementResponse;

View File

@@ -2,9 +2,9 @@ package org.jetlinks.community.dashboard.web.response;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import org.jetlinks.community.dashboard.MeasurementDimension;
import org.jetlinks.core.metadata.ConfigMetadata; import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.DataType;
import org.jetlinks.community.dashboard.MeasurementDimension;
@Getter @Getter
@Setter @Setter

View File

@@ -2,8 +2,8 @@ package org.jetlinks.community.dashboard.web.response;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.community.dashboard.Measurement; import org.jetlinks.community.dashboard.Measurement;
import org.jetlinks.core.metadata.DataType;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.List; import java.util.List;

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>jetlinks-components</artifactId> <artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId> <groupId>org.jetlinks.community</groupId>
<version>1.9.0-SNAPSHOT</version> <version>1.10.0</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@@ -7,7 +7,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds; import org.elasticsearch.search.aggregations.bucket.histogram.LongBounds;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.hswebframework.ezorm.core.param.QueryParam; import org.hswebframework.ezorm.core.param.QueryParam;
import org.hswebframework.ezorm.core.param.TermType; import org.hswebframework.ezorm.core.param.TermType;
@@ -27,7 +27,6 @@ import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.community.elastic.search.utils.ReactorActionListener; import org.jetlinks.community.elastic.search.utils.ReactorActionListener;
import org.jetlinks.community.timeseries.query.AggregationQueryParam; import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink; import reactor.core.publisher.MonoSink;
@@ -240,8 +239,8 @@ public class DefaultAggregationService implements AggregationService {
return structure; return structure;
} }
protected static ExtendedBounds getExtendedBounds(AggregationQueryParam param) { protected static LongBounds getExtendedBounds(AggregationQueryParam param) {
return new ExtendedBounds(calculateStartWithTime(param), param.getEndWithTime()); return new LongBounds(calculateStartWithTime(param), param.getEndWithTime());
} }
private static long calculateStartWithTime(AggregationQueryParam param) { private static long calculateStartWithTime(AggregationQueryParam param) {

View File

@@ -1,7 +1,7 @@
package org.jetlinks.community.elastic.search.aggreation.bucket; package org.jetlinks.community.elastic.search.aggreation.bucket;
import lombok.*; import lombok.*;
import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds; import org.elasticsearch.search.aggregations.bucket.histogram.LongBounds;
import org.hswebframework.utils.StringUtils; import org.hswebframework.utils.StringUtils;
import org.jetlinks.community.elastic.search.aggreation.enums.BucketType; import org.jetlinks.community.elastic.search.aggreation.enums.BucketType;
import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsAggregationStructure; import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsAggregationStructure;
@@ -37,7 +37,7 @@ public class BucketAggregationsStructure {
private List<Ranges> ranges; private List<Ranges> ranges;
private ExtendedBounds extendedBounds; private LongBounds extendedBounds;
/** /**
* 时间格式 * 时间格式

View File

@@ -16,7 +16,6 @@ import org.jetlinks.community.elastic.search.embedded.EmbeddedElasticSearch;
import org.jetlinks.community.elastic.search.embedded.EmbeddedElasticSearchProperties; import org.jetlinks.community.elastic.search.embedded.EmbeddedElasticSearchProperties;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
import org.jetlinks.community.elastic.search.service.reactive.DefaultReactiveElasticsearchClient; import org.jetlinks.community.elastic.search.service.reactive.DefaultReactiveElasticsearchClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;

View File

@@ -1,7 +1,7 @@
package org.jetlinks.community.elastic.search.index; package org.jetlinks.community.elastic.search.index;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter; import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.core.metadata.PropertyMetadata;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;

View File

@@ -9,10 +9,6 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.compress.CompressedXContent;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.SimplePropertyMetadata;
import org.jetlinks.core.metadata.types.*;
import org.jetlinks.community.elastic.search.enums.ElasticDateFormat; import org.jetlinks.community.elastic.search.enums.ElasticDateFormat;
import org.jetlinks.community.elastic.search.enums.ElasticPropertyType; import org.jetlinks.community.elastic.search.enums.ElasticPropertyType;
import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata; import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata;
@@ -20,6 +16,10 @@ import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexStrategy; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexStrategy;
import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient; import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.SimplePropertyMetadata;
import org.jetlinks.core.metadata.types.*;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.*; import java.util.*;

View File

@@ -37,7 +37,6 @@ import org.jetlinks.community.elastic.search.utils.ReactorActionListener;
import org.jetlinks.core.utils.FluxUtils; import org.jetlinks.core.utils.FluxUtils;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@@ -119,12 +118,14 @@ public class DefaultElasticSearchService implements ElasticSearchService {
}); });
} }
@Override
public <T> Flux<T> query(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) { public <T> Flux<T> query(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this return this
.doQuery(new String[]{index}, queryParam) .doQuery(new String[]{index}, queryParam)
.flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)); .flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper));
} }
@Override
public <T> Flux<T> query(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) { public <T> Flux<T> query(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this return this
.doQuery(index, queryParam) .doQuery(index, queryParam)
@@ -373,7 +374,7 @@ public class DefaultElasticSearchService implements ElasticSearchService {
private Mono<SearchResponse> doSearch(SearchRequest request) { private Mono<SearchResponse> doSearch(SearchRequest request) {
return this return this
.<SearchRequest, SearchResponse>execute(request, restClient.getQueryClient()::searchAsync) .execute(request, restClient.getQueryClient()::searchAsync)
.onErrorResume(err -> { .onErrorResume(err -> {
log.error("query elastic error", err); log.error("query elastic error", err);
return Mono.empty(); return Mono.empty();

View File

@@ -12,6 +12,11 @@ import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.function.Function; import java.util.function.Function;
/**
* ES数据库业务操作类
*
* @author zhouhao
*/
public interface ElasticSearchService { public interface ElasticSearchService {
default <T> Mono<PagerResult<T>> queryPager(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) { default <T> Mono<PagerResult<T>> queryPager(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {

View File

@@ -60,7 +60,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*; import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbers;
@@ -111,12 +110,11 @@ import java.util.StringJoiner;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
import static org.elasticsearch.rest.BaseRestHandler.INCLUDE_TYPE_NAME_PARAMETER;
import static org.springframework.data.elasticsearch.client.util.RequestConverters.createContentType; import static org.springframework.data.elasticsearch.client.util.RequestConverters.createContentType;
@Slf4j @Slf4j
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient { public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient {
private final HostProvider hostProvider; private final HostProvider<?> hostProvider;
private final RequestCreator requestCreator; private final RequestCreator requestCreator;
private Supplier<HttpHeaders> headersSupplier = () -> HttpHeaders.EMPTY; private Supplier<HttpHeaders> headersSupplier = () -> HttpHeaders.EMPTY;

View File

@@ -10,8 +10,8 @@ import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.LongBounds;
import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
@@ -293,9 +293,9 @@ public class ReactiveAggregationService implements AggregationService {
return queryParam; return queryParam;
} }
protected static ExtendedBounds getExtendedBounds(AggregationQueryParam param) { protected static LongBounds getExtendedBounds(AggregationQueryParam param) {
return new ExtendedBounds(calculateStartWithTime(param), param.getEndWithTime()); return new LongBounds(calculateStartWithTime(param), param.getEndWithTime());
} }
//聚合查询默认的时间间隔 //聚合查询默认的时间间隔

View File

@@ -27,12 +27,12 @@ import org.hswebframework.utils.time.DateFormatter;
import org.hswebframework.utils.time.DefaultDateFormatter; import org.hswebframework.utils.time.DefaultDateFormatter;
import org.hswebframework.web.api.crud.entity.PagerResult; import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.bean.FastBeanCopier; import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.core.utils.FluxUtils;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.service.ElasticSearchService; import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter; import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.community.elastic.search.utils.QueryParamTranslator; import org.jetlinks.community.elastic.search.utils.QueryParamTranslator;
import org.jetlinks.core.utils.FluxUtils;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.DependsOn;
@@ -57,6 +57,8 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* 响应式ES数据库操作类
*
* @author zhouhao * @author zhouhao
* @since 1.0 * @since 1.0
**/ **/
@@ -66,20 +68,23 @@ import java.util.stream.Collectors;
@ConfigurationProperties(prefix = "elasticsearch") @ConfigurationProperties(prefix = "elasticsearch")
public class ReactiveElasticSearchService implements ElasticSearchService { public class ReactiveElasticSearchService implements ElasticSearchService {
private final ReactiveElasticsearchClient restClient;
private final ElasticSearchIndexManager indexManager;
private FluxSink<Buffer> sink;
public static final IndicesOptions indexOptions = IndicesOptions.fromOptions( public static final IndicesOptions indexOptions = IndicesOptions.fromOptions(
true, true, false, false true, true, false, false
); );
//使用对象池处理Buffer,减少GC消耗
static ObjectPool<Buffer> pool = ObjectPool.newPool(Buffer::new);
static { static {
DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ")); DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
} }
private final ReactiveElasticsearchClient restClient;
private final ElasticSearchIndexManager indexManager;
private FluxSink<Buffer> sink;
@Getter
@Setter
private BufferConfig buffer = new BufferConfig();
public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient, public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient,
ElasticSearchIndexManager indexManager) { ElasticSearchIndexManager indexManager) {
this.restClient = restClient; this.restClient = restClient;
@@ -119,12 +124,14 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
}); });
} }
@Override
public <T> Flux<T> query(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) { public <T> Flux<T> query(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this return this
.doQuery(new String[]{index}, queryParam) .doQuery(new String[]{index}, queryParam)
.flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)); .flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper));
} }
@Override
public <T> Flux<T> query(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) { public <T> Flux<T> query(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this return this
.doQuery(index, queryParam) .doQuery(index, queryParam)
@@ -134,16 +141,16 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
@Override @Override
public <T> Mono<PagerResult<T>> queryPager(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) { public <T> Mono<PagerResult<T>> queryPager(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this.doQuery(index, queryParam) return this.doQuery(index, queryParam)
.flatMap(tp2 -> this .flatMap(tp2 -> this
.convertQueryResult(tp2.getT1(), tp2.getT2(), mapper) .convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
.collectList() .collectList()
.filter(CollectionUtils::isNotEmpty) .filter(CollectionUtils::isNotEmpty)
.map(list -> PagerResult.of((int) tp2 .map(list -> PagerResult.of((int) tp2
.getT2() .getT2()
.getHits() .getHits()
.getTotalHits().value, list, queryParam)) .getTotalHits().value, list, queryParam))
) )
.switchIfEmpty(Mono.fromSupplier(PagerResult::empty)); .switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
} }
private <T> Flux<T> convertQueryResult(List<ElasticSearchIndexMetadata> indexList, private <T> Flux<T> convertQueryResult(List<ElasticSearchIndexMetadata> indexList,
@@ -162,8 +169,8 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
} }
return mapper return mapper
.apply(Optional .apply(Optional
.ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0)) .ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
.convertFromElastic(hitMap)); .convertFromElastic(hitMap));
}); });
} }
@@ -184,7 +191,6 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
}); });
} }
@Override @Override
public Mono<Long> count(String[] index, QueryParam queryParam) { public Mono<Long> count(String[] index, QueryParam queryParam) {
QueryParam param = queryParam.clone(); QueryParam param = queryParam.clone();
@@ -223,8 +229,8 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
@Override @Override
public <T> Mono<Void> commit(String index, Publisher<T> data) { public <T> Mono<Void> commit(String index, Publisher<T> data) {
return Flux.from(data) return Flux.from(data)
.flatMap(d -> commit(index, d)) .flatMap(d -> commit(index, d))
.then(); .then();
} }
@Override @Override
@@ -235,10 +241,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
@Override @Override
public <T> Mono<Void> save(String index, Publisher<T> data) { public <T> Mono<Void> save(String index, Publisher<T> data) {
return Flux.from(data) return Flux.from(data)
.map(v -> Buffer.of(index, v)) .map(v -> Buffer.of(index, v))
.collectList() .collectList()
.flatMap(this::doSave) .flatMap(this::doSave)
.then(); .then();
} }
@Override @Override
@@ -251,32 +257,6 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
sink.complete(); sink.complete();
} }
@Getter
@Setter
private BufferConfig buffer = new BufferConfig();
@Getter
@Setter
public static class BufferConfig {
//最小间隔
private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
//缓冲最大数量
private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
//缓冲超时时间
private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
//背压堆积数量限制.
private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime
.getRuntime()
.availableProcessors());
//最大缓冲字节
private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB"));
//最大重试次数
private int maxRetry = 3;
//重试间隔
private Duration minBackoff = Duration.ofSeconds(3);
}
//@PostConstruct //@PostConstruct
public void init() { public void init() {
int flushRate = buffer.rate; int flushRate = buffer.rate;
@@ -288,10 +268,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
FluxUtils FluxUtils
.bufferRate(Flux.<Buffer>create(sink -> this.sink = sink), .bufferRate(Flux.<Buffer>create(sink -> this.sink = sink),
flushRate, flushRate,
bufferSize, bufferSize,
bufferTimeout, bufferTimeout,
(b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes) (b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes)
.doOnNext(buf -> bufferedBytes.set(0)) .doOnNext(buf -> bufferedBytes.set(0))
.onBackpressureBuffer(bufferBackpressure, drop -> { .onBackpressureBuffer(bufferBackpressure, drop -> {
// TODO: 2020/11/25 将丢弃的数据存储到本地磁盘 // TODO: 2020/11/25 将丢弃的数据存储到本地磁盘
@@ -319,19 +299,186 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
}) })
.onErrorResume((err) -> Mono .onErrorResume((err) -> Mono
.fromRunnable(() -> System.err.println("保存ElasticSearch数据失败:\n" + .fromRunnable(() -> System.err.println("保存ElasticSearch数据失败:\n" +
org.hswebframework.utils.StringUtils.throwable2String(err)))) org.hswebframework.utils.StringUtils.throwable2String(err))))
.subscribe(); .subscribe();
} }
//使用对象池处理Buffer,减少GC消耗 private Mono<String> getIndexForSave(String index) {
static ObjectPool<Buffer> pool = ObjectPool.newPool(Buffer::new); return indexManager
.getIndexStrategy(index)
.map(strategy -> strategy.getIndexForSave(index));
}
private Mono<String> getIndexForSearch(String index) {
return indexManager
.getIndexStrategy(index)
.map(strategy -> strategy.getIndexForSearch(index));
}
protected Mono<Integer> doSave(Collection<Buffer> buffers) {
return Flux.fromIterable(buffers)
.groupBy(Buffer::getIndex, Integer.MAX_VALUE)
.flatMap(group -> {
String index = group.key();
return this
.getIndexForSave(index)
.flatMapMany(realIndex -> group
.map(buffer -> {
try {
IndexRequest request;
if (buffer.id != null) {
request = new IndexRequest(realIndex).type("_doc").id(buffer.id);
} else {
request = new IndexRequest(realIndex).type("_doc");
}
request.source(buffer.payload, XContentType.JSON);
return request;
} finally {
buffer.release();
}
}));
})
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(lst -> {
BulkRequest request = new BulkRequest();
request.timeout(TimeValue.timeValueSeconds(9));
lst.forEach(request::add);
return restClient
.bulk(request)
.as(save -> {
if (buffer.maxRetry > 0) {
return save.retryWhen(Retry.backoff(buffer.maxRetry, buffer.minBackoff));
}
return save;
});
})
.doOnNext(response -> {
if (response.hasFailures()) {
System.err.println(response.buildFailureMessage());
}
})
.thenReturn(buffers.size());
}
@SneakyThrows
protected void checkResponse(BulkResponse response) {
if (response.hasFailures()) {
for (BulkItemResponse item : response.getItems()) {
if (item.isFailed()) {
throw item.getFailure().getCause();
}
}
}
}
private <T> List<T> translate(Function<Map<String, Object>, T> mapper, SearchResponse response) {
return Arrays.stream(response.getHits().getHits())
.map(hit -> {
Map<String, Object> hitMap = hit.getSourceAsMap();
if (StringUtils.isEmpty(hitMap.get("id"))) {
hitMap.put("id", hit.getId());
}
return mapper.apply(hitMap);
})
.collect(Collectors.toList());
}
private Flux<SearchHit> doSearch(SearchRequest request) {
return restClient
.search(request)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
}
private Mono<Long> doCount(SearchRequest request) {
return restClient
.count(request)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
}
protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, String... indexes) {
return indexManager
.getIndexesMetadata(indexes)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> createSearchRequest(queryParam, list));
}
protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList ->
new SearchRequest(indexList.toArray(new String[0]))
.source(builder)
.indicesOptions(indexOptions));
}
protected Mono<QueryBuilder> createQueryBuilder(QueryParam queryParam, String index) {
return indexManager
.getIndexMetadata(index)
.map(metadata -> QueryParamTranslator.createQueryBuilder(queryParam, metadata))
.switchIfEmpty(Mono.fromSupplier(() -> QueryParamTranslator.createQueryBuilder(queryParam, null)));
}
protected Mono<CountRequest> createCountRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
QueryParam tempQueryParam = queryParam.clone();
tempQueryParam.setPaging(false);
tempQueryParam.setSorts(Collections.emptyList());
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder));
}
private Mono<CountRequest> createCountRequest(QueryParam queryParam, String... index) {
return indexManager
.getIndexesMetadata(index)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> createCountRequest(queryParam, list));
}
@Getter
@Setter
public static class BufferConfig {
//最小间隔
private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
//缓冲最大数量
private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
//缓冲超时时间
private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
//背压堆积数量限制.
private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime
.getRuntime()
.availableProcessors());
//最大缓冲字节
private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB"));
//最大重试次数
private int maxRetry = 3;
//重试间隔
private Duration minBackoff = Duration.ofSeconds(3);
}
@Getter @Getter
static class Buffer { static class Buffer {
final ObjectPool.Handle<Buffer> handle;
String index; String index;
String id; String id;
String payload; String payload;
final ObjectPool.Handle<Buffer> handle;
public Buffer(ObjectPool.Handle<Buffer> handle) { public Buffer(ObjectPool.Handle<Buffer> handle) {
this.handle = handle; this.handle = handle;
@@ -367,153 +514,4 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
return payload == null ? 0 : payload.length() * 2; return payload == null ? 0 : payload.length() * 2;
} }
} }
private Mono<String> getIndexForSave(String index) {
return indexManager
.getIndexStrategy(index)
.map(strategy -> strategy.getIndexForSave(index));
}
private Mono<String> getIndexForSearch(String index) {
return indexManager
.getIndexStrategy(index)
.map(strategy -> strategy.getIndexForSearch(index));
}
protected Mono<Integer> doSave(Collection<Buffer> buffers) {
return Flux.fromIterable(buffers)
.groupBy(Buffer::getIndex,Integer.MAX_VALUE)
.flatMap(group -> {
String index = group.key();
return this
.getIndexForSave(index)
.flatMapMany(realIndex -> group
.map(buffer -> {
try {
IndexRequest request;
if (buffer.id != null) {
request = new IndexRequest(realIndex).type("_doc").id(buffer.id);
} else {
request = new IndexRequest(realIndex).type("_doc");
}
request.source(buffer.payload, XContentType.JSON);
return request;
} finally {
buffer.release();
}
}));
})
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(lst -> {
BulkRequest request = new BulkRequest();
request.timeout(TimeValue.timeValueSeconds(9));
lst.forEach(request::add);
return restClient
.bulk(request)
.as(save -> {
if (buffer.maxRetry > 0) {
return save.retryWhen(Retry.backoff(buffer.maxRetry, buffer.minBackoff));
}
return save;
});
})
.doOnNext(response -> {
if (response.hasFailures()) {
System.err.println(response.buildFailureMessage());
}
})
.thenReturn(buffers.size());
}
@SneakyThrows
protected void checkResponse(BulkResponse response) {
if (response.hasFailures()) {
for (BulkItemResponse item : response.getItems()) {
if (item.isFailed()) {
throw item.getFailure().getCause();
}
}
}
}
private <T> List<T> translate(Function<Map<String, Object>, T> mapper, SearchResponse response) {
return Arrays.stream(response.getHits().getHits())
.map(hit -> {
Map<String, Object> hitMap = hit.getSourceAsMap();
if (StringUtils.isEmpty(hitMap.get("id"))) {
hitMap.put("id", hit.getId());
}
return mapper.apply(hitMap);
})
.collect(Collectors.toList());
}
private Flux<SearchHit> doSearch(SearchRequest request) {
return restClient
.search(request)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
}
private Mono<Long> doCount(SearchRequest request) {
return restClient
.count(request)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
}
protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, String... indexes) {
return indexManager
.getIndexesMetadata(indexes)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> createSearchRequest(queryParam, list));
}
protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList ->
new SearchRequest(indexList.toArray(new String[0]))
.source(builder)
.indicesOptions(indexOptions));
}
protected Mono<QueryBuilder> createQueryBuilder(QueryParam queryParam, String index) {
return indexManager
.getIndexMetadata(index)
.map(metadata -> QueryParamTranslator.createQueryBuilder(queryParam, metadata))
.switchIfEmpty(Mono.fromSupplier(() -> QueryParamTranslator.createQueryBuilder(queryParam, null)));
}
protected Mono<CountRequest> createCountRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
QueryParam tempQueryParam = queryParam.clone();
tempQueryParam.setPaging(false);
tempQueryParam.setSorts(Collections.emptyList());
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder));
}
private Mono<CountRequest> createCountRequest(QueryParam queryParam, String... index) {
return indexManager
.getIndexesMetadata(index)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> createCountRequest(queryParam, list));
}
} }

View File

@@ -1,8 +1,6 @@
package org.jetlinks.community.elastic.search.timeseries; package org.jetlinks.community.elastic.search.timeseries;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.metadata.SimplePropertyMetadata;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata; import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.service.AggregationService; import org.jetlinks.community.elastic.search.service.AggregationService;
@@ -11,6 +9,8 @@ import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.TimeSeriesMetadata; import org.jetlinks.community.timeseries.TimeSeriesMetadata;
import org.jetlinks.community.timeseries.TimeSeriesMetric; import org.jetlinks.community.timeseries.TimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesService; import org.jetlinks.community.timeseries.TimeSeriesService;
import org.jetlinks.core.metadata.SimplePropertyMetadata;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;

View File

@@ -2,13 +2,13 @@ package org.jetlinks.community.elastic.search.utils;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.hswebframework.ezorm.core.param.QueryParam; import org.hswebframework.ezorm.core.param.QueryParam;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.core.metadata.Converter; import org.jetlinks.core.metadata.Converter;
import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata; import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.DateTimeType; import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.GeoPoint; import org.jetlinks.core.metadata.types.GeoPoint;
import org.jetlinks.core.metadata.types.GeoType; import org.jetlinks.core.metadata.types.GeoType;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;

View File

@@ -9,10 +9,10 @@ import org.elasticsearch.search.sort.SortOrder;
import org.hswebframework.ezorm.core.param.QueryParam; import org.hswebframework.ezorm.core.param.QueryParam;
import org.hswebframework.ezorm.core.param.Sort; import org.hswebframework.ezorm.core.param.Sort;
import org.hswebframework.ezorm.core.param.Term; import org.hswebframework.ezorm.core.param.Term;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.parser.DefaultLinkTypeParser; import org.jetlinks.community.elastic.search.parser.DefaultLinkTypeParser;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.util.Map; import java.util.Map;

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>jetlinks-components</artifactId> <artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId> <groupId>org.jetlinks.community</groupId>
<version>1.9.0-SNAPSHOT</version> <version>1.10.0</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@@ -1,8 +1,8 @@
package org.jetlinks.community.gateway; package org.jetlinks.community.gateway;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.core.message.Message; import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.Transport; import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.community.network.NetworkType;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;

View File

@@ -2,42 +2,97 @@ package org.jetlinks.community.gateway;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import org.jetlinks.core.event.TopicPayload; import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.message.CommonDeviceMessage;
import org.jetlinks.core.message.CommonDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.MessageType; import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.property.ReadPropertyMessageReply; import org.jetlinks.core.message.property.*;
import org.jetlinks.core.message.property.ReportPropertyMessage; import org.springframework.util.StringUtils;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.function.Supplier;
public class DeviceMessageUtils { public class DeviceMessageUtils {
@SuppressWarnings("all")
public static Optional<DeviceMessage> convert(TopicPayload message) { public static Optional<DeviceMessage> convert(TopicPayload message) {
return Optional.of(message.decode(DeviceMessage.class)); return Optional.of(message.decode(DeviceMessage.class));
} }
public static Optional<DeviceMessage> convert(ByteBuf payload) { public static Optional<DeviceMessage> convert(ByteBuf payload) {
try {
return MessageType.convertMessage(JSON.parseObject(payload.toString(StandardCharsets.UTF_8)));
} finally {
ReferenceCountUtil.safeRelease(payload);
}
}
return MessageType.convertMessage(JSON.parseObject(payload.toString(StandardCharsets.UTF_8))); public static void trySetProperties(DeviceMessage message, Map<String, Object> properties) {
if (message instanceof ReportPropertyMessage) {
((ReportPropertyMessage) message).setProperties(properties);
} else if (message instanceof ReadPropertyMessageReply) {
((ReadPropertyMessageReply) message).setProperties(properties);
} else if (message instanceof WritePropertyMessageReply) {
((WritePropertyMessageReply) message).setProperties(properties);
}
} }
public static Optional<Map<String, Object>> tryGetProperties(DeviceMessage message) { public static Optional<Map<String, Object>> tryGetProperties(DeviceMessage message) {
if (message instanceof ReportPropertyMessage) { if (message instanceof PropertyMessage) {
return Optional.ofNullable(((ReportPropertyMessage) message).getProperties()); return Optional.ofNullable(((PropertyMessage) message).getProperties());
} }
if (message instanceof ReadPropertyMessageReply) { return Optional.empty();
return Optional.ofNullable(((ReadPropertyMessageReply) message).getProperties()); }
}
if (message instanceof WritePropertyMessageReply) { public static Optional<Map<String, Long>> tryGetPropertySourceTimes(DeviceMessage message) {
return Optional.ofNullable(((WritePropertyMessageReply) message).getProperties()); if (message instanceof PropertyMessage) {
return Optional.ofNullable(((PropertyMessage) message).getPropertySourceTimes());
} }
return Optional.empty(); return Optional.empty();
} }
public static Optional<Map<String, String>> tryGetPropertyStates(DeviceMessage message) {
if (message instanceof PropertyMessage) {
return Optional.ofNullable(((PropertyMessage) message).getPropertyStates());
}
return Optional.empty();
}
public static List<Property> tryGetCompleteProperties(DeviceMessage message) {
if (message instanceof PropertyMessage) {
return ((PropertyMessage) message).getCompleteProperties();
}
return Collections.emptyList();
}
public static void trySetDeviceId(DeviceMessage message, String deviceId) {
if (message instanceof CommonDeviceMessage) {
((CommonDeviceMessage) message).setDeviceId(deviceId);
} else if (message instanceof CommonDeviceMessageReply) {
((CommonDeviceMessageReply<?>) message).setDeviceId(deviceId);
}
}
public static void trySetMessageId(DeviceMessage message, Supplier<String> messageId) {
if (StringUtils.hasText(message.getMessageId())) {
return;
}
if (message instanceof CommonDeviceMessage) {
((CommonDeviceMessage) message).setMessageId(messageId.get());
} else if (message instanceof CommonDeviceMessageReply) {
((CommonDeviceMessageReply<?>) message).setMessageId(messageId.get());
}
}
} }

View File

@@ -2,7 +2,6 @@ package org.jetlinks.community.gateway;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import lombok.Getter; import lombok.Getter;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.rule.engine.executor.PayloadType; import org.jetlinks.rule.engine.executor.PayloadType;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;

View File

@@ -1,6 +1,8 @@
package org.jetlinks.community.gateway; package org.jetlinks.community.gateway;
import lombok.*; import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import java.util.Collection; import java.util.Collection;
import java.util.stream.Collectors; import java.util.stream.Collectors;

View File

@@ -1,182 +0,0 @@
package org.jetlinks.community.gateway;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.BiFunction;
@Getter
@Setter
@EqualsAndHashCode(of = "part")
public class TopicPart {
private TopicPart parent;
private String part;
private volatile String topic;
private int depth;
private ConcurrentMap<String, TopicPart> child = new ConcurrentHashMap<>();
private Set<String> sessionId = new CopyOnWriteArraySet<>();
private static final AntPathMatcher matcher = new AntPathMatcher();
public TopicPart(TopicPart parent, String part) {
if (StringUtils.isEmpty(part) || part.equals("/")) {
this.part = "";
} else {
if (part.contains("/")) {
this.ofTopic(part);
} else {
this.part = part;
}
}
this.parent = parent;
if (null != parent) {
this.depth = parent.depth + 1;
}
}
public String getTopic() {
if (topic == null) {
TopicPart parent = getParent();
StringBuilder builder = new StringBuilder();
if (parent != null) {
String parentTopic = parent.getTopic();
builder.append(parentTopic).append(parentTopic.equals("/") ? "" : "/");
} else {
builder.append("/");
}
return topic = builder.append(part).toString();
}
return topic;
}
public TopicPart subscribe(String topic) {
return getOrDefault(topic, TopicPart::new);
}
public void addSessionId(String... sessionId) {
this.sessionId.addAll(Arrays.asList(sessionId));
}
public void removeSession(String... sessionId) {
this.sessionId.removeAll(Arrays.asList(sessionId));
}
private void ofTopic(String topic) {
String[] parts = topic.split("[/]", 2);
this.part = parts[0];
if (parts.length > 1) {
TopicPart part = new TopicPart(this, parts[1]);
this.child.put(part.part, part);
}
}
private TopicPart getOrDefault(String topic, BiFunction<TopicPart, String, TopicPart> mapping) {
if (topic.startsWith("/")) {
topic = topic.substring(1);
}
String[] parts = topic.split("[/]");
TopicPart part = child.computeIfAbsent(parts[0], _topic -> mapping.apply(this, _topic));
for (int i = 1; i < parts.length && part != null; i++) {
TopicPart parent = part;
part = part.child.computeIfAbsent(parts[i], _topic -> mapping.apply(parent, _topic));
}
return part;
}
public Mono<TopicPart> get(String topic) {
return Mono.justOrEmpty(getOrDefault(topic, ((topicPart, s) -> null)));
}
public Flux<TopicPart> find(String topic) {
return find(topic, this);
}
@Override
public String toString() {
return "topic: " + getTopic() + ", sessions: " + sessionId.size();
}
public static Flux<TopicPart> find(String topic,
TopicPart topicPart) {
return Flux.create(sink -> {
ArrayDeque<TopicPart> cache = new ArrayDeque<>();
cache.add(topicPart);
String[] topicParts = topic.split("[/]");
String nextPart = null;
while (!cache.isEmpty() && !sink.isCancelled()) {
TopicPart part = cache.poll();
if (part == null) {
break;
}
if (part.part.equals("**")
// || part.part.equals("*")
|| matcher.match(part.getTopic(), topic)
|| (matcher.match(topic, part.getTopic()))) {
sink.next(part);
}
//订阅了如 /device/**/event/*
if (part.part.equals("**")) {
TopicPart tmp = null;
for (int i = part.depth; i < topicParts.length; i++) {
tmp = part.child.get(topicParts[i]);
if (tmp != null) {
cache.add(tmp);
}
}
if (null != tmp) {
continue;
}
}
if ("**".equals(nextPart) || "*".equals(nextPart)) {
cache.addAll(part.child.values());
continue;
}
TopicPart next = part.child.get("**");
if (next != null) {
cache.add(next);
}
next = part.child.get("*");
if (next != null) {
cache.add(next);
}
if (part.depth + 1 >= topicParts.length) {
continue;
}
nextPart = topicParts[part.depth + 1];
if (nextPart.equals("*") || nextPart.equals("**")) {
cache.addAll(part.child.values());
continue;
}
next = part.child.get(nextPart);
if (next != null) {
cache.add(next);
}
}
sink.complete();
});
}
}

View File

@@ -22,6 +22,11 @@ public interface Message {
return new SimpleMessage(id, topic, null, Type.error, message); return new SimpleMessage(id, topic, null, Type.error, message);
} }
static Message error(String id, String topic, Throwable message) {
return new SimpleMessage(id, topic, null, Type.error, message.getMessage() == null ? message.getClass().getSimpleName() : message.getMessage());
}
static Message success(String id, String topic, Object payload) { static Message success(String id, String topic, Object payload) {
return new SimpleMessage(id, topic, payload, Type.result, null); return new SimpleMessage(id, topic, payload, Type.result, null);
} }
@@ -30,11 +35,16 @@ public interface Message {
return new SimpleMessage(id, null, null, Type.complete, null); return new SimpleMessage(id, null, null, Type.complete, null);
} }
static Message pong(String id) {
return new SimpleMessage(id, null, null, Type.pong, null);
}
enum Type { enum Type {
authError, authError,
result, result,
error, error,
complete complete,
ping,
pong
} }
} }

View File

@@ -3,25 +3,29 @@ package org.jetlinks.community.gateway.external.socket;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.authorization.Authentication;
import org.hswebframework.web.authorization.ReactiveAuthenticationManager; import org.hswebframework.web.authorization.ReactiveAuthenticationManager;
import org.hswebframework.web.authorization.token.UserToken; import org.hswebframework.web.authorization.token.UserToken;
import org.hswebframework.web.authorization.token.UserTokenManager; import org.hswebframework.web.authorization.token.UserTokenManager;
import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.community.gateway.external.Message; import org.jetlinks.community.gateway.external.Message;
import org.jetlinks.community.gateway.external.MessagingManager; import org.jetlinks.community.gateway.external.MessagingManager;
import org.jetlinks.community.gateway.external.SubscribeRequest; import org.jetlinks.community.gateway.external.SubscribeRequest;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession; import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.Disposable; import reactor.core.Disposable;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
@AllArgsConstructor @AllArgsConstructor
@Slf4j @Slf4j
@@ -60,8 +64,25 @@ public class WebSocketMessagingHandler implements WebSocketHandler {
.receive() .receive()
.doOnNext(message -> { .doOnNext(message -> {
try { try {
if (message.getType() == WebSocketMessage.Type.PONG) {
return;
}
if (message.getType() == WebSocketMessage.Type.PING) {
session
.send(Mono.just(session.pongMessage(DataBufferFactory::allocateBuffer)))
.subscribe();
return;
}
MessagingRequest request = JSON.parseObject(message.getPayloadAsText(), MessagingRequest.class); MessagingRequest request = JSON.parseObject(message.getPayloadAsText(), MessagingRequest.class);
if (request == null || request.getType() == MessagingRequest.Type.ping) { if (request == null) {
return;
}
if (request.getType() == MessagingRequest.Type.ping) {
session
.send(Mono.just(session.textMessage(JSON.toJSONString(
Message.pong(request.getId())
))))
.subscribe();
return; return;
} }
if (StringUtils.isEmpty(request.getId())) { if (StringUtils.isEmpty(request.getId())) {
@@ -73,33 +94,46 @@ public class WebSocketMessagingHandler implements WebSocketHandler {
} }
if (request.getType() == MessagingRequest.Type.sub) { if (request.getType() == MessagingRequest.Type.sub) {
//重复订阅 //重复订阅
if (subs.containsKey(request.getId())) { Disposable old = subs.get(request.getId());
if (old != null && !old.isDisposed()) {
return; return;
} }
subs.put(request.getId(), messagingManager Map<String, String> context = new HashMap<>();
context.put("userId", auth.getUser().getId());
context.put("userName", auth.getUser().getName());
Disposable sub = messagingManager
.subscribe(SubscribeRequest.of(request, auth)) .subscribe(SubscribeRequest.of(request, auth))
.onErrorResume(err -> Mono.just(Message.error(request.getId(), request.getTopic(), err.getMessage()))) .doOnEach(ReactiveLogger.onError(err -> log.error("{}", err.getMessage(), err)))
.onErrorResume(err -> Mono.just(Message.error(request.getId(), request.getTopic(), err)))
.map(msg -> session.textMessage(JSON.toJSONString(msg))) .map(msg -> session.textMessage(JSON.toJSONString(msg)))
.doOnComplete(() -> { .doOnComplete(() -> {
log.debug("complete subscription:{}", request.getTopic());
subs.remove(request.getId()); subs.remove(request.getId());
Mono.just(session.textMessage(JSON.toJSONString(Message.complete(request.getId())))) Mono.just(session.textMessage(JSON.toJSONString(Message.complete(request.getId()))))
.as(session::send) .as(session::send)
.subscribe(); .subscribe();
}) })
.flatMap(msg -> session.send(Mono.just(msg))) .doOnCancel(() -> {
.subscribe() log.debug("cancel subscription:{}", request.getTopic());
); subs.remove(request.getId());
})
.transform(session::send)
.subscriberContext(ReactiveLogger.start(context))
.subscriberContext(Context.of(Authentication.class, auth))
.subscribe();
if (!sub.isDisposed()) {
subs.put(request.getId(), sub);
}
} else if (request.getType() == MessagingRequest.Type.unsub) { } else if (request.getType() == MessagingRequest.Type.unsub) {
Optional.ofNullable(subs.remove(request.getId())) Optional.ofNullable(subs.remove(request.getId()))
.ifPresent(Disposable::dispose); .ifPresent(Disposable::dispose);
} else { } else {
session.send(Mono.just(session.textMessage(JSON.toJSONString( session.send(Mono.just(session.textMessage(JSON.toJSONString(
Message.error(request.getId(), request.getTopic(), "不支持的类型:" + request.getType()) Message.error(request.getId(), request.getTopic(), "不支持的类型:" + request.getType())
)))).subscribe(); )))).subscribe();
} }
} catch (Exception e) { } catch (Exception e) {
log.warn(e.getMessage(),e); log.warn(e.getMessage(), e);
session.send(Mono.just(session.textMessage(JSON.toJSONString( session.send(Mono.just(session.textMessage(JSON.toJSONString(
Message.error("illegal_argument", null, "消息格式错误") Message.error("illegal_argument", null, "消息格式错误")
)))).subscribe(); )))).subscribe();

View File

@@ -3,7 +3,6 @@ package org.jetlinks.community.gateway.monitor;
import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.MeterRegistry;
import lombok.AllArgsConstructor;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;

View File

@@ -2,7 +2,12 @@ package org.jetlinks.community.gateway.monitor.measurements;
import org.hswebframework.web.api.crud.entity.QueryParamEntity; import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.Interval; import org.jetlinks.community.Interval;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.gateway.monitor.GatewayTimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.Aggregation; import org.jetlinks.community.timeseries.query.Aggregation;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.jetlinks.core.metadata.ConfigMetadata; import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DefaultConfigMetadata; import org.jetlinks.core.metadata.DefaultConfigMetadata;
@@ -10,14 +15,8 @@ import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.EnumType; import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.IntType; import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.StringType; import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.gateway.monitor.GatewayTimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import java.time.Duration;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Date; import java.util.Date;

View File

@@ -1,12 +1,11 @@
package org.jetlinks.community.gateway.monitor.measurements; package org.jetlinks.community.gateway.monitor.measurements;
import org.jetlinks.community.dashboard.MeasurementDefinition;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider; import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.jetlinks.community.timeseries.TimeSeriesManager; import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.Aggregation; import org.jetlinks.community.timeseries.query.Aggregation;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import static org.jetlinks.community.dashboard.MeasurementDefinition.*; import static org.jetlinks.community.dashboard.MeasurementDefinition.of;
@Component @Component
public class DeviceGatewayMeasurementProvider extends StaticMeasurementProvider { public class DeviceGatewayMeasurementProvider extends StaticMeasurementProvider {

View File

@@ -12,29 +12,53 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/**
* 设备网关管理器
* <p>
* TCP UDP MQTT CoAP
*
* @author zhouhao
*/
@Component @Component
public class DefaultDeviceGatewayManager implements DeviceGatewayManager, BeanPostProcessor { public class DefaultDeviceGatewayManager implements DeviceGatewayManager, BeanPostProcessor {
private final DeviceGatewayPropertiesManager propertiesManager; private final DeviceGatewayPropertiesManager propertiesManager;
private Map<String, DeviceGatewayProvider> providers = new ConcurrentHashMap<>(); /**
* TCP MQTT的设备网关服务提供者
*/
private final Map<String, DeviceGatewayProvider> providers = new ConcurrentHashMap<>();
private Map<String, DeviceGateway> store = new ConcurrentHashMap<>(); /**
* 启动状态的设备网关
*/
private final Map<String, DeviceGateway> store = new ConcurrentHashMap<>();
public DefaultDeviceGatewayManager(DeviceGatewayPropertiesManager propertiesManager) { public DefaultDeviceGatewayManager(DeviceGatewayPropertiesManager propertiesManager) {
this.propertiesManager = propertiesManager; this.propertiesManager = propertiesManager;
} }
/**
* 获取设备网关,有则返回,没有就创建返回
*
* @param id 网关ID
* @return 设备网关
*/
private Mono<DeviceGateway> doGetGateway(String id) { private Mono<DeviceGateway> doGetGateway(String id) {
if (store.containsKey(id)) { if (store.containsKey(id)) {
return Mono.just(store.get(id)); return Mono.just(store.get(id));
} }
// 数据库查 DeviceGatewayEntity 转换成 DeviceGatewayProperties
// BeanMap中找provider 找不到就是不支持
// 创建设备网关
// double check 防止重复创建
return propertiesManager return propertiesManager
.getProperties(id) .getProperties(id)
.switchIfEmpty(Mono.error(()->new UnsupportedOperationException("网关配置[" + id + "]不存在"))) .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("网关配置[" + id + "]不存在")))
.flatMap(properties -> Mono .flatMap(properties -> Mono
.justOrEmpty(providers.get(properties.getProvider())) .justOrEmpty(providers.get(properties.getProvider()))
.switchIfEmpty(Mono.error(()->new UnsupportedOperationException("不支持的网络服务[" + properties.getProvider() + "]"))) .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("不支持的网络服务[" + properties.getProvider() + "]")))
.flatMap(provider -> provider .flatMap(provider -> provider
.createDeviceGateway(properties) .createDeviceGateway(properties)
.flatMap(gateway -> { .flatMap(gateway -> {

View File

@@ -7,9 +7,17 @@ import org.jetlinks.community.ValueObject;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
/**
* 设备网关属性外观类
* <p>
* 转换设备网关属性数据
* </p>
*
* @author zhouhao
*/
@Getter @Getter
@Setter @Setter
public class DeviceGatewayProperties implements ValueObject { public class DeviceGatewayProperties implements ValueObject {
private String id; private String id;
@@ -17,7 +25,7 @@ public class DeviceGatewayProperties implements ValueObject {
private String networkId; private String networkId;
private Map<String,Object> configuration=new HashMap<>(); private Map<String, Object> configuration = new HashMap<>();
@Override @Override
public Map<String, Object> values() { public Map<String, Object> values() {

View File

@@ -2,8 +2,19 @@ package org.jetlinks.community.gateway.supports;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
/**
* 设备网关属性管理器
*
* @author zhouhao
*/
public interface DeviceGatewayPropertiesManager { public interface DeviceGatewayPropertiesManager {
/**
* 获取网关的属性
*
* @param id 网关ID
* @return 网关属性
*/
Mono<DeviceGatewayProperties> getProperties(String id); Mono<DeviceGatewayProperties> getProperties(String id);

View File

@@ -4,6 +4,14 @@ import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.network.NetworkType; import org.jetlinks.community.network.NetworkType;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
/**
* 设备网关支持提供商,用于提供对各种设备网关的支持.在启动设备网关时,会根据对应的提供商以及配置来创建设备网关.
* 实现统一管理网关配置,动态创建设备网关.
*
* @author zhouhao
* @see DeviceGateway
* @since 1.0
*/
public interface DeviceGatewayProvider { public interface DeviceGatewayProvider {
String getId(); String getId();

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>jetlinks-components</artifactId> <artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId> <groupId>org.jetlinks.community</groupId>
<version>1.9.0-SNAPSHOT</version> <version>1.10.0</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
@@ -17,7 +17,7 @@
<dependency> <dependency>
<groupId>commons-io</groupId> <groupId>commons-io</groupId>
<artifactId>commons-io</artifactId> <artifactId>commons-io</artifactId>
<version>2.6</version> <version>2.7</version>
</dependency> </dependency>
<dependency> <dependency>

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>jetlinks-components</artifactId> <artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId> <groupId>org.jetlinks.community</groupId>
<version>1.9.0-SNAPSHOT</version> <version>1.10.0</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,10 @@ import lombok.Setter;
import org.jetlinks.community.logging.system.SerializableSystemLog; import org.jetlinks.community.logging.system.SerializableSystemLog;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.*; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ConfigurationProperties(prefix = "jetlinks.logging") @ConfigurationProperties(prefix = "jetlinks.logging")
@Getter @Getter

View File

@@ -1,13 +1,13 @@
package org.jetlinks.community.logging.event.handler; package org.jetlinks.community.logging.event.handler;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata; import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.service.ElasticSearchService; import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.logging.access.SerializableAccessLog; import org.jetlinks.community.logging.access.SerializableAccessLog;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;

View File

@@ -1,14 +1,14 @@
package org.jetlinks.community.logging.event.handler; package org.jetlinks.community.logging.event.handler;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata; import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.service.ElasticSearchService; import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.logging.system.SerializableSystemLog; import org.jetlinks.community.logging.system.SerializableSystemLog;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>network-component</artifactId> <artifactId>network-component</artifactId>
<groupId>org.jetlinks.community</groupId> <groupId>org.jetlinks.community</groupId>
<version>1.9.0-SNAPSHOT</version> <version>1.10.0</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@@ -1,7 +1,7 @@
package org.jetlinks.community.network.mqtt.client; package org.jetlinks.community.network.mqtt.client;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.community.network.Network; import org.jetlinks.community.network.Network;
import org.jetlinks.core.message.codec.MqttMessage;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;

View File

@@ -7,14 +7,14 @@ import io.vertx.mqtt.MqttClientOptions;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.bean.FastBeanCopier; import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.utils.ExpressionUtils; import org.hswebframework.web.utils.ExpressionUtils;
import org.jetlinks.community.network.*;
import org.jetlinks.community.network.security.CertificateManager;
import org.jetlinks.community.network.security.VertxKeyCertTrustOptions;
import org.jetlinks.core.metadata.ConfigMetadata; import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DefaultConfigMetadata; import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.BooleanType; import org.jetlinks.core.metadata.types.BooleanType;
import org.jetlinks.core.metadata.types.IntType; import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.StringType; import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.network.*;
import org.jetlinks.community.network.security.CertificateManager;
import org.jetlinks.community.network.security.VertxKeyCertTrustOptions;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;

View File

@@ -5,11 +5,11 @@ import io.vertx.core.buffer.Buffer;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.core.message.codec.MqttMessage; import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.SimpleMqttMessage; import org.jetlinks.core.message.codec.SimpleMqttMessage;
import org.jetlinks.core.topic.Topic; import org.jetlinks.core.topic.Topic;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import reactor.core.Disposable; import reactor.core.Disposable;
import reactor.core.Disposables; import reactor.core.Disposables;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;

View File

@@ -3,11 +3,11 @@ package org.jetlinks.community.network.mqtt.executor;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.hswebframework.web.bean.FastBeanCopier; import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.dict.EnumDict; import org.hswebframework.web.dict.EnumDict;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.community.network.DefaultNetworkType; import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkManager; import org.jetlinks.community.network.NetworkManager;
import org.jetlinks.community.network.PubSubType; import org.jetlinks.community.network.PubSubType;
import org.jetlinks.community.network.mqtt.client.MqttClient; import org.jetlinks.community.network.mqtt.client.MqttClient;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.rule.engine.api.RuleConstants; import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.RuleData; import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataCodecs; import org.jetlinks.rule.engine.api.RuleDataCodecs;

View File

@@ -2,6 +2,15 @@ package org.jetlinks.community.network.mqtt.gateway.device;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.community.gateway.monitor.GatewayMonitors;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.mqtt.client.MqttClient;
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttClientSession;
import org.jetlinks.community.network.mqtt.gateway.device.session.UnknownDeviceMqttClientSession;
import org.jetlinks.community.network.utils.DeviceGatewayHelper;
import org.jetlinks.core.ProtocolSupport; import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.ProtocolSupports; import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceOperator;
@@ -13,15 +22,6 @@ import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.FromDeviceMessageContext; import org.jetlinks.core.message.codec.FromDeviceMessageContext;
import org.jetlinks.core.message.codec.Transport; import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.session.DeviceSessionManager; import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.community.gateway.monitor.GatewayMonitors;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.mqtt.client.MqttClient;
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttClientSession;
import org.jetlinks.community.network.mqtt.gateway.device.session.UnknownDeviceMqttClientSession;
import org.jetlinks.community.network.utils.DeviceGatewayHelper;
import org.jetlinks.supports.server.DecodedClientMessageHandler; import org.jetlinks.supports.server.DecodedClientMessageHandler;
import reactor.core.Disposable; import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.EmitterProcessor;
@@ -51,6 +51,8 @@ public class MqttClientDeviceGateway implements DeviceGateway {
private final String protocol; private final String protocol;
private final int qos;
private final ProtocolSupports protocolSupport; private final ProtocolSupports protocolSupport;
private final EmitterProcessor<Message> processor = EmitterProcessor.create(false); private final EmitterProcessor<Message> processor = EmitterProcessor.create(false);
@@ -72,7 +74,8 @@ public class MqttClientDeviceGateway implements DeviceGateway {
String protocol, String protocol,
DeviceSessionManager sessionManager, DeviceSessionManager sessionManager,
DecodedClientMessageHandler clientMessageHandler, DecodedClientMessageHandler clientMessageHandler,
List<String> topics) { List<String> topics,
int qos) {
this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor(id); this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor(id);
this.id = Objects.requireNonNull(id, "id"); this.id = Objects.requireNonNull(id, "id");
@@ -82,6 +85,7 @@ public class MqttClientDeviceGateway implements DeviceGateway {
this.protocol = Objects.requireNonNull(protocol, "protocol"); this.protocol = Objects.requireNonNull(protocol, "protocol");
this.topics = Objects.requireNonNull(topics, "topics"); this.topics = Objects.requireNonNull(topics, "topics");
this.helper = new DeviceGatewayHelper(registry, sessionManager, clientMessageHandler); this.helper = new DeviceGatewayHelper(registry, sessionManager, clientMessageHandler);
this.qos = qos;
} }
@@ -95,7 +99,7 @@ public class MqttClientDeviceGateway implements DeviceGateway {
} }
disposable disposable
.add(mqttClient .add(mqttClient
.subscribe(topics) .subscribe(topics,qos)
.filter((msg) -> started.get()) .filter((msg) -> started.get())
.flatMap(mqttMessage -> { .flatMap(mqttMessage -> {
AtomicReference<Duration> timeoutRef = new AtomicReference<>(); AtomicReference<Duration> timeoutRef = new AtomicReference<>();

View File

@@ -1,8 +1,5 @@
package org.jetlinks.community.network.mqtt.gateway.device; package org.jetlinks.community.network.mqtt.gateway.device;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.community.gateway.DeviceGateway; import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.gateway.supports.DeviceGatewayProperties; import org.jetlinks.community.gateway.supports.DeviceGatewayProperties;
import org.jetlinks.community.gateway.supports.DeviceGatewayProvider; import org.jetlinks.community.gateway.supports.DeviceGatewayProvider;
@@ -10,6 +7,9 @@ import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkManager; import org.jetlinks.community.network.NetworkManager;
import org.jetlinks.community.network.NetworkType; import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.mqtt.client.MqttClient; import org.jetlinks.community.network.mqtt.client.MqttClient;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.supports.server.DecodedClientMessageHandler; import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@@ -64,6 +64,7 @@ public class MqttClientDeviceGatewayProvider implements DeviceGatewayProvider {
String protocol = (String) properties.getConfiguration().get("protocol"); String protocol = (String) properties.getConfiguration().get("protocol");
String topics = (String) properties.getConfiguration().get("topics"); String topics = (String) properties.getConfiguration().get("topics");
int qos = properties.getInt("qos").orElse(0);
Objects.requireNonNull(topics, "topics"); Objects.requireNonNull(topics, "topics");
return new MqttClientDeviceGateway(properties.getId(), return new MqttClientDeviceGateway(properties.getId(),
@@ -73,7 +74,8 @@ public class MqttClientDeviceGatewayProvider implements DeviceGatewayProvider {
protocol, protocol,
sessionManager, sessionManager,
clientMessageHandler, clientMessageHandler,
Arrays.asList(topics.split("[,;\n]")) Arrays.asList(topics.split("[,;\n]")),
qos
); );
}); });

View File

@@ -4,6 +4,16 @@ import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.logger.ReactiveLogger; import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.community.gateway.monitor.GatewayMonitors;
import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.community.network.mqtt.server.MqttServer;
import org.jetlinks.community.network.utils.DeviceGatewayHelper;
import org.jetlinks.core.ProtocolSupport; import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.device.AuthenticationResponse; import org.jetlinks.core.device.AuthenticationResponse;
import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceOperator;
@@ -20,16 +30,6 @@ import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.DeviceSessionManager; import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.core.server.session.ReplaceableDeviceSession; import org.jetlinks.core.server.session.ReplaceableDeviceSession;
import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.community.gateway.monitor.GatewayMonitors;
import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.community.network.mqtt.server.MqttServer;
import org.jetlinks.community.network.utils.DeviceGatewayHelper;
import org.jetlinks.supports.server.DecodedClientMessageHandler; import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import reactor.core.Disposable; import reactor.core.Disposable;

View File

@@ -1,8 +1,5 @@
package org.jetlinks.community.network.mqtt.gateway.device; package org.jetlinks.community.network.mqtt.gateway.device;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.community.gateway.DeviceGateway; import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.gateway.supports.DeviceGatewayProperties; import org.jetlinks.community.gateway.supports.DeviceGatewayProperties;
import org.jetlinks.community.gateway.supports.DeviceGatewayProvider; import org.jetlinks.community.gateway.supports.DeviceGatewayProvider;
@@ -10,6 +7,9 @@ import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkManager; import org.jetlinks.community.network.NetworkManager;
import org.jetlinks.community.network.NetworkType; import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.mqtt.server.MqttServer; import org.jetlinks.community.network.mqtt.server.MqttServer;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.supports.server.DecodedClientMessageHandler; import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;

View File

@@ -2,14 +2,14 @@ package org.jetlinks.community.network.mqtt.gateway.device.session;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.community.network.mqtt.client.MqttClient;
import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.codec.DefaultTransport; import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.EncodedMessage; import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MqttMessage; import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.Transport; import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.community.network.mqtt.client.MqttClient;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.time.Duration; import java.time.Duration;

View File

@@ -2,12 +2,12 @@ package org.jetlinks.community.network.mqtt.gateway.device.session;
import lombok.Getter; import lombok.Getter;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor; import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.codec.EncodedMessage; import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MqttMessage; import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.Transport; import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.core.server.session.ReplaceableDeviceSession; import org.jetlinks.core.server.session.ReplaceableDeviceSession;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;

View File

@@ -1,13 +1,13 @@
package org.jetlinks.community.network.mqtt.gateway.device.session; package org.jetlinks.community.network.mqtt.gateway.device.session;
import lombok.Getter; import lombok.Getter;
import org.jetlinks.community.network.mqtt.client.MqttClient;
import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.codec.DefaultTransport; import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.EncodedMessage; import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MqttMessage; import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.Transport; import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.community.network.mqtt.client.MqttClient;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
public class UnknownDeviceMqttClientSession implements DeviceSession { public class UnknownDeviceMqttClientSession implements DeviceSession {

View File

@@ -28,7 +28,6 @@ import reactor.core.publisher.Mono;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
import java.util.Optional; import java.util.Optional;
import java.util.function.Consumer; import java.util.function.Consumer;

View File

@@ -7,9 +7,9 @@ import io.vertx.mqtt.MqttServerOptions;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.bean.FastBeanCopier; import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.network.*; import org.jetlinks.community.network.*;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.community.network.security.CertificateManager; import org.jetlinks.community.network.security.CertificateManager;
import org.jetlinks.community.network.security.VertxKeyCertTrustOptions; import org.jetlinks.community.network.security.VertxKeyCertTrustOptions;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>network-component</artifactId> <artifactId>network-component</artifactId>
<groupId>org.jetlinks.community</groupId> <groupId>org.jetlinks.community</groupId>
<version>1.9.0-SNAPSHOT</version> <version>1.10.0</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@@ -19,6 +19,11 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/**
* 默认网络管理器
*
* @author zhouhao
*/
@Component @Component
@Slf4j @Slf4j
public class DefaultNetworkManager implements NetworkManager, BeanPostProcessor { public class DefaultNetworkManager implements NetworkManager, BeanPostProcessor {
@@ -26,9 +31,9 @@ public class DefaultNetworkManager implements NetworkManager, BeanPostProcessor
private final NetworkConfigManager configManager; private final NetworkConfigManager configManager;
private Map<String, Map<String, Network>> store = new ConcurrentHashMap<>(); private final Map<String, Map<String, Network>> store = new ConcurrentHashMap<>();
private Map<String, NetworkProvider<Object>> providerSupport = new ConcurrentHashMap<>(); private final Map<String, NetworkProvider<Object>> providerSupport = new ConcurrentHashMap<>();
public DefaultNetworkManager(NetworkConfigManager configManager) { public DefaultNetworkManager(NetworkConfigManager configManager) {
this.configManager = configManager; this.configManager = configManager;
@@ -49,7 +54,12 @@ public class DefaultNetworkManager implements NetworkManager, BeanPostProcessor
.subscribe(t -> this.checkNetwork()); .subscribe(t -> this.checkNetwork());
} }
/**
* 检查网络 把需要加载的网络组件启动起来
*/
protected void checkNetwork() { protected void checkNetwork() {
// 获取并过滤所有停止的网络组件
// 重新加载启动状态的网络组件
Flux.fromIterable(store.values()) Flux.fromIterable(store.values())
.flatMapIterable(Map::values) .flatMapIterable(Map::values)
.filter(i -> !i.isAlive()) .filter(i -> !i.isAlive())
@@ -84,6 +94,14 @@ public class DefaultNetworkManager implements NetworkManager, BeanPostProcessor
.map(n -> (T) n); .map(n -> (T) n);
} }
/**
* 如果store中不存在网络组件就创建存在就重新加载
*
* @param provider 网络组件支持提供商
* @param id 网络组件唯一标识
* @param properties 网络组件配置
* @return 网络组件
*/
public Network doCreate(NetworkProvider<Object> provider, String id, Object properties) { public Network doCreate(NetworkProvider<Object> provider, String id, Object properties) {
return getNetworkStore(provider.getType()).compute(id, (s, network) -> { return getNetworkStore(provider.getType()).compute(id, (s, network) -> {
if (network == null) { if (network == null) {

View File

@@ -2,6 +2,11 @@ package org.jetlinks.community.network;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
/**
* 网络组件配置管理器
*
* @author zhouhao
*/
public interface NetworkConfigManager { public interface NetworkConfigManager {
Mono<NetworkProperties> getConfig(NetworkType networkType, String id); Mono<NetworkProperties> getConfig(NetworkType networkType, String id);

View File

@@ -4,13 +4,48 @@ import reactor.core.publisher.Mono;
import java.util.List; import java.util.List;
/**
* 网络服务管理器
* <p>
* 管理所有的网络组件
*
* @author zhouhao
* @since 1.0
*/
public interface NetworkManager { public interface NetworkManager {
/**
* 根据ID获取网络组件否则根据type和id创建网络组件并返回
*
* @param type 网络类型
* @param id 网络组件id
* @param <T> NetWork子类泛型
* @return 网络组件
*/
<T extends Network> Mono<T> getNetwork(NetworkType type, String id); <T extends Network> Mono<T> getNetwork(NetworkType type, String id);
/**
* 获取所有的网络组件支持提供商
*
* @return 网络组件支持提供商
*/
List<NetworkProvider<?>> getProviders(); List<NetworkProvider<?>> getProviders();
Mono<Void> reload(NetworkType type, String id); /**
* 重新加载网络组件
*
* @param type 网络类型
* @param id 网络组件ID
* @return void
*/
Mono<Void> reload(NetworkType type, String id);
Mono<Void> shutdown(NetworkType type, String id); /**
* 停止网络组件
*
* @param type 网络类型
* @param id 网络组件ID
* @return void
*/
Mono<Void> shutdown(NetworkType type, String id);
} }

View File

@@ -6,6 +6,11 @@ import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509KeyManager; import javax.net.ssl.X509KeyManager;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
/**
* 证书接口
*
* @author zhouhao
*/
public interface Certificate { public interface Certificate {
String getId(); String getId();

View File

@@ -2,6 +2,11 @@ package org.jetlinks.community.network.security;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
/**
* 证书管理接口
*
* @author zhouhao
*/
public interface CertificateManager { public interface CertificateManager {
Mono<Certificate> getCertificate(String id); Mono<Certificate> getCertificate(String id);

View File

@@ -13,13 +13,18 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.stream.Stream; import java.util.stream.Stream;
/**
* 默认证书
*
* @author zhouhao
*/
public class DefaultCertificate implements Certificate { public class DefaultCertificate implements Certificate {
@Getter @Getter
private String id; private final String id;
@Getter @Getter
private String name; private final String name;
private KeyStoreHelper keyHelper; private KeyStoreHelper keyHelper;
@@ -131,12 +136,12 @@ public class DefaultCertificate implements Certificate {
return EMPTY; return EMPTY;
} }
return Arrays.stream(trustHelper return Arrays.stream(trustHelper
.getTrustMgrs()) .getTrustMgrs())
.filter(X509TrustManager.class::isInstance) .filter(X509TrustManager.class::isInstance)
.map(X509TrustManager.class::cast) .map(X509TrustManager.class::cast)
.map(X509TrustManager::getAcceptedIssuers) .map(X509TrustManager::getAcceptedIssuers)
.flatMap(Stream::of) .flatMap(Stream::of)
.toArray(X509Certificate[]::new); .toArray(X509Certificate[]::new);
} }
@Override @Override

View File

@@ -2,14 +2,18 @@ package org.jetlinks.community.network.utils;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.jetlinks.community.PropertyConstants; import org.jetlinks.community.PropertyConstants;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.*; import org.jetlinks.core.message.*;
import org.jetlinks.core.message.state.DeviceStateCheckMessage;
import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
import org.jetlinks.core.server.session.ChildrenDeviceSession; import org.jetlinks.core.server.session.ChildrenDeviceSession;
import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.DeviceSessionManager; import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.core.server.session.KeepOnlineSession; import org.jetlinks.core.server.session.KeepOnlineSession;
import org.jetlinks.supports.server.DecodedClientMessageHandler; import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.time.Duration; import java.time.Duration;
@@ -17,6 +21,14 @@ import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
/**
* 设备网关处理工具
* <p>
* 封装常用的设备消息处理操作
* </p>
*
* @author zhouhao
*/
@AllArgsConstructor @AllArgsConstructor
public class DeviceGatewayHelper { public class DeviceGatewayHelper {
@@ -45,42 +57,84 @@ public class DeviceGatewayHelper {
} }
protected Mono<Void> handleChildrenDeviceMessage(String deviceId, DeviceMessage children) { protected Mono<Void> handleChildrenDeviceMessage(String deviceId, DeviceMessage children) {
if (deviceId == null
|| children instanceof DeviceStateCheckMessage
|| children instanceof DeviceStateCheckMessageReply
|| children instanceof DisconnectDeviceMessage
|| children instanceof DisconnectDeviceMessageReply) {
return Mono.empty();
}
if (children instanceof DeviceMessageReply) {
DeviceMessageReply reply = ((DeviceMessageReply) children);
if (!reply.isSuccess()) {
return Mono.empty();
}
}
ChildrenDeviceSession deviceSession = sessionManager.getSession(deviceId, children.getDeviceId()); ChildrenDeviceSession deviceSession = sessionManager.getSession(deviceId, children.getDeviceId());
//子设备离线 //子设备离线或者注销
if(children instanceof DeviceOfflineMessage){ if (children instanceof DeviceOfflineMessage || children instanceof DeviceUnRegisterMessage) {
//注销会话,这里子设备可能会收到多次离线消息 //注销会话,这里子设备可能会收到多次离线消息
//注销会话一次离线,消息网关转发子设备消息一次 //注销会话一次离线,消息网关转发子设备消息一次
if (deviceSession != null && children instanceof DeviceOfflineMessage) {
//忽略离线消息,因为注销会话时,会自动发送一个离线消息
children.addHeader(Headers.ignore, true);
}
return sessionManager return sessionManager
.unRegisterChildren(deviceId,children.getDeviceId()) .unRegisterChildren(deviceId, children.getDeviceId())
.then(); .then();
} }
if (deviceSession == null && null != children.getDeviceId()) { if (deviceSession == null && null != children.getDeviceId()) {
Mono<Void> then = sessionManager //忽略上线消息,因为注册会话时,会自动发送一个上线消息
if (children instanceof DeviceOnlineMessage) {
children.addHeader(Headers.ignore, true);
}
Mono<Void> registerSession = sessionManager
.registerChildren(deviceId, children.getDeviceId()) .registerChildren(deviceId, children.getDeviceId())
.then(); .then();
//子设备注册 //子设备注册
if (isDoRegister(children)) { if (isDoRegister(children)) {
then = Mono.delay(Duration.ofSeconds(2)) return Mono
.then(then); .delay(Duration.ofSeconds(2))
.then(registry
.getDevice(children.getDeviceId())
.flatMap(device -> device
//没有配置状态自管理才自动上线
.getSelfConfig(DeviceConfigKey.selfManageState)
.defaultIfEmpty(false)
.filter(Boolean.FALSE::equals)
.flatMap(ignore -> registerSession))
);
} }
return then; return registerSession;
} }
return Mono.empty(); return Mono.empty();
} }
/**
* 处理来自设备网关的设备消息
*
* @param message 设备消息
* @param sessionBuilder 设备操作
* @param sessionConsumer 设备消费
* @param deviceNotFoundListener 异常监听
* @return 设备操作
*/
public Mono<DeviceOperator> handleDeviceMessage(DeviceMessage message, public Mono<DeviceOperator> handleDeviceMessage(DeviceMessage message,
Function<DeviceOperator, DeviceSession> sessionBuilder, Function<DeviceOperator, DeviceSession> sessionBuilder,
Consumer<DeviceSession> sessionConsumer, Consumer<DeviceSession> sessionConsumer,
Supplier<Mono<DeviceOperator>> deviceNotFoundListener) { Supplier<Mono<DeviceOperator>> deviceNotFoundListener) {
String deviceId = message.getDeviceId(); String deviceId = message.getDeviceId();
if (StringUtils.isEmpty(deviceId)) {
return Mono.empty();
}
Mono<Void> then = Mono.empty(); Mono<Void> then = Mono.empty();
boolean doHandle = true; boolean doHandle = true;
if (message instanceof ChildDeviceMessage) { if (message instanceof ChildDeviceMessage) {
DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage(); DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage();
then = handleChildrenDeviceMessage(deviceId,childrenMessage); then = handleChildrenDeviceMessage(deviceId, childrenMessage);
} else if (message instanceof ChildDeviceMessageReply) { } else if (message instanceof ChildDeviceMessageReply) {
DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage(); DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage();
then = handleChildrenDeviceMessage(deviceId,childrenMessage); then = handleChildrenDeviceMessage(deviceId, childrenMessage);
} else if (message instanceof DeviceOfflineMessage) { } else if (message instanceof DeviceOfflineMessage) {
//设备离线消息 //设备离线消息
DeviceSession session = sessionManager.unregister(deviceId); DeviceSession session = sessionManager.unregister(deviceId);
@@ -118,19 +172,21 @@ public class DeviceGatewayHelper {
})) }))
.flatMap(device -> { .flatMap(device -> {
DeviceSession newSession = sessionBuilder.apply(device); DeviceSession newSession = sessionBuilder.apply(device);
//保持会话,在低功率设备上,可能无法保持mqtt长连接. if (null != newSession) {
if (message.getHeader(Headers.keepOnline).orElse(false)) { //保持会话,在低功率设备上,可能无法保持mqtt长连接.
int timeout = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds); if (message.getHeader(Headers.keepOnline).orElse(false)) {
newSession = new KeepOnlineSession(newSession, Duration.ofSeconds(timeout)); int timeout = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
} newSession = new KeepOnlineSession(newSession, Duration.ofSeconds(timeout));
sessionManager.register(newSession); }
sessionConsumer.accept(newSession); sessionManager.register(newSession);
newSession.keepAlive(); sessionConsumer.accept(newSession);
if (!(message instanceof DeviceRegisterMessage) && newSession.keepAlive();
!(message instanceof DeviceOnlineMessage)) { if (!(message instanceof DeviceRegisterMessage) &&
return messageHandler !(message instanceof DeviceOnlineMessage)) {
.handleMessage(device, message) return messageHandler
.thenReturn(device); .handleMessage(device, message)
.thenReturn(device);
}
} }
return Mono.just(device); return Mono.just(device);
}) })
@@ -138,6 +194,16 @@ public class DeviceGatewayHelper {
.flatMap(then::thenReturn) .flatMap(then::thenReturn)
; ;
} else { } else {
//消息中指定保存在线
if (message.getHeader(Headers.keepOnline).orElse(false)
&& !(session instanceof KeepOnlineSession)) {
Duration timeout = message
.getHeader(Headers.keepOnlineTimeoutSeconds)
.map(Duration::ofSeconds)
.orElse(Duration.ofHours(1));
//替换session
session = sessionManager.replace(session, new KeepOnlineSession(session, timeout));
}
sessionConsumer.accept(session); sessionConsumer.accept(session);
session.keepAlive(); session.keepAlive();
if (doHandle) { if (doHandle) {

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>jetlinks-components</artifactId> <artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId> <groupId>org.jetlinks.community</groupId>
<version>1.9.0-SNAPSHOT</version> <version>1.10.0</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
<packaging>pom</packaging> <packaging>pom</packaging>

View File

@@ -5,7 +5,7 @@
<parent> <parent>
<artifactId>network-component</artifactId> <artifactId>network-component</artifactId>
<groupId>org.jetlinks.community</groupId> <groupId>org.jetlinks.community</groupId>
<version>1.9.0-SNAPSHOT</version> <version>1.10.0</version>
<relativePath>../pom.xml</relativePath> <relativePath>../pom.xml</relativePath>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@@ -2,6 +2,7 @@ package org.jetlinks.community.network.tcp.client;
import org.jetlinks.community.network.Network; import org.jetlinks.community.network.Network;
import org.jetlinks.community.network.tcp.TcpMessage; import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.core.server.ClientConnection;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@@ -14,7 +15,7 @@ import java.time.Duration;
* @author zhouhao * @author zhouhao
* @version 1.0 * @version 1.0
*/ */
public interface TcpClient extends Network { public interface TcpClient extends Network, ClientConnection {
/** /**
* 获取客户端远程地址 * 获取客户端远程地址

View File

@@ -7,7 +7,6 @@ import org.jetlinks.community.network.tcp.parser.PayloadParserType;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional;
@Getter @Getter
@Setter @Setter

View File

@@ -12,6 +12,7 @@ import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType; import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.tcp.TcpMessage; import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.parser.PayloadParser; import org.jetlinks.community.network.tcp.parser.PayloadParser;
import org.jetlinks.core.message.codec.EncodedMessage;
import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink; import reactor.core.publisher.FluxSink;
@@ -28,27 +29,23 @@ import java.util.function.Function;
@Slf4j @Slf4j
public class VertxTcpClient implements TcpClient { public class VertxTcpClient implements TcpClient {
public volatile NetClient client;
public NetSocket socket;
volatile PayloadParser payloadParser;
@Getter @Getter
private final String id; private final String id;
private final List<Runnable> disconnectListener = new CopyOnWriteArrayList<>();
private final EmitterProcessor<TcpMessage> processor = EmitterProcessor.create(false);
private final FluxSink<TcpMessage> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
private final boolean serverClient;
public volatile NetClient client;
public NetSocket socket;
volatile PayloadParser payloadParser;
@Setter @Setter
private long keepAliveTimeoutMs = Duration.ofMinutes(10).toMillis(); private long keepAliveTimeoutMs = Duration.ofMinutes(10).toMillis();
private volatile long lastKeepAliveTime = System.currentTimeMillis(); private volatile long lastKeepAliveTime = System.currentTimeMillis();
private final List<Runnable> disconnectListener = new CopyOnWriteArrayList<>(); public VertxTcpClient(String id, boolean serverClient) {
this.id = id;
private final EmitterProcessor<TcpMessage> processor = EmitterProcessor.create(false); this.serverClient = serverClient;
}
private final FluxSink<TcpMessage> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
private final boolean serverClient;
@Override @Override
public void keepAlive() { public void keepAlive() {
@@ -67,6 +64,43 @@ public class VertxTcpClient implements TcpClient {
} }
} }
@Override
public InetSocketAddress address() {
return getRemoteAddress();
}
@Override
public Mono<Void> sendMessage(EncodedMessage message) {
return Mono
.create((sink) -> {
if (socket == null) {
sink.error(new SocketException("socket closed"));
return;
}
Buffer buffer = Buffer.buffer(message.getPayload());
socket.write(buffer, r -> {
keepAlive();
if (r.succeeded()) {
sink.success();
} else {
sink.error(r.cause());
}
});
});
}
@Override
public Flux<EncodedMessage> receiveMessage() {
return this
.subscribe()
.cast(EncodedMessage.class);
}
@Override
public void disconnect() {
shutdown();
}
@Override @Override
public boolean isAlive() { public boolean isAlive() {
return socket != null && (keepAliveTimeoutMs < 0 || System.currentTimeMillis() - lastKeepAliveTime < keepAliveTimeoutMs); return socket != null && (keepAliveTimeoutMs < 0 || System.currentTimeMillis() - lastKeepAliveTime < keepAliveTimeoutMs);
@@ -77,11 +111,11 @@ public class VertxTcpClient implements TcpClient {
return true; return true;
} }
public VertxTcpClient(String id,boolean serverClient) { /**
this.id = id; * 接收TCP消息
this.serverClient=serverClient; *
} * @param message TCP消息
*/
protected void received(TcpMessage message) { protected void received(TcpMessage message) {
if (processor.getPending() > processor.getBufferSize() / 2) { if (processor.getPending() > processor.getBufferSize() / 2) {
log.warn("tcp [{}] message pending {} ,drop message:{}", processor.getPending(), getRemoteAddress(), message.toString()); log.warn("tcp [{}] message pending {} ,drop message:{}", processor.getPending(), getRemoteAddress(), message.toString());
@@ -139,7 +173,7 @@ public class VertxTcpClient implements TcpClient {
execute(runnable); execute(runnable);
} }
disconnectListener.clear(); disconnectListener.clear();
if(serverClient){ if (serverClient) {
processor.onComplete(); processor.onComplete();
} }
} }
@@ -152,6 +186,11 @@ public class VertxTcpClient implements TcpClient {
this.client = client; this.client = client;
} }
/**
* 设置客户端消息解析器
*
* @param payloadParser 消息解析器
*/
public void setRecordParser(PayloadParser payloadParser) { public void setRecordParser(PayloadParser payloadParser) {
synchronized (this) { synchronized (this) {
if (null != this.payloadParser && this.payloadParser != payloadParser) { if (null != this.payloadParser && this.payloadParser != payloadParser) {
@@ -167,6 +206,11 @@ public class VertxTcpClient implements TcpClient {
} }
} }
/**
* socket处理
*
* @param socket socket
*/
public void setSocket(NetSocket socket) { public void setSocket(NetSocket socket) {
synchronized (this) { synchronized (this) {
Objects.requireNonNull(payloadParser); Objects.requireNonNull(payloadParser);
@@ -193,21 +237,8 @@ public class VertxTcpClient implements TcpClient {
@Override @Override
public Mono<Boolean> send(TcpMessage message) { public Mono<Boolean> send(TcpMessage message) {
return Mono.<Boolean>create((sink) -> { return sendMessage(message)
if (socket == null) { .thenReturn(true);
sink.error(new SocketException("socket closed"));
return;
}
Buffer buffer = Buffer.buffer(message.getPayload());
socket.write(buffer, r -> {
keepAlive();
if (r.succeeded()) {
sink.success(true);
} else {
sink.error(r.cause());
}
});
});
} }
@Override @Override

View File

@@ -5,12 +5,11 @@ import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions; import io.vertx.core.net.NetClientOptions;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.bean.FastBeanCopier; import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.core.Values;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.community.network.*; import org.jetlinks.community.network.*;
import org.jetlinks.community.network.security.CertificateManager; import org.jetlinks.community.network.security.CertificateManager;
import org.jetlinks.community.network.security.VertxKeyCertTrustOptions; import org.jetlinks.community.network.security.VertxKeyCertTrustOptions;
import org.jetlinks.community.network.tcp.parser.PayloadParserBuilder; import org.jetlinks.community.network.tcp.parser.PayloadParserBuilder;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;

View File

@@ -2,13 +2,13 @@ package org.jetlinks.community.network.tcp.device;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.client.TcpClient;
import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.codec.EncodedMessage; import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.Transport; import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.client.TcpClient;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;

View File

@@ -3,17 +3,6 @@ package org.jetlinks.community.network.tcp.device;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.logger.ReactiveLogger; import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.FromDeviceMessageContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.community.gateway.DeviceGateway; import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor; import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.community.gateway.monitor.GatewayMonitors; import org.jetlinks.community.gateway.monitor.GatewayMonitors;
@@ -24,6 +13,20 @@ import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.client.TcpClient; import org.jetlinks.community.network.tcp.client.TcpClient;
import org.jetlinks.community.network.tcp.server.TcpServer; import org.jetlinks.community.network.tcp.server.TcpServer;
import org.jetlinks.community.network.utils.DeviceGatewayHelper; import org.jetlinks.community.network.utils.DeviceGatewayHelper;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.FromDeviceMessageContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.DeviceGatewayContext;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.supports.server.DecodedClientMessageHandler; import org.jetlinks.supports.server.DecodedClientMessageHandler;
import reactor.core.Disposable; import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.EmitterProcessor;
@@ -45,6 +48,9 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
@Getter @Getter
private final String id; private final String id;
/**
* 维护所有创建的tcp server
*/
private final TcpServer tcpServer; private final TcpServer tcpServer;
private final String protocol; private final String protocol;
@@ -57,6 +63,9 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
private final DeviceGatewayMonitor gatewayMonitor; private final DeviceGatewayMonitor gatewayMonitor;
/**
* 连接计数器
*/
private final LongAdder counter = new LongAdder(); private final LongAdder counter = new LongAdder();
private final EmitterProcessor<Message> processor = EmitterProcessor.create(false); private final EmitterProcessor<Message> processor = EmitterProcessor.create(false);
@@ -64,10 +73,11 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
private final FluxSink<Message> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER); private final FluxSink<Message> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
private final AtomicBoolean started = new AtomicBoolean(); private final AtomicBoolean started = new AtomicBoolean();
private Disposable disposable;
private final DeviceGatewayHelper helper; private final DeviceGatewayHelper helper;
/**
* 数据流控开关
*/
private Disposable disposable;
public TcpServerDeviceGateway(String id, public TcpServerDeviceGateway(String id,
String protocol, String protocol,
@@ -90,116 +100,45 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
return supports.getProtocol(protocol); return supports.getProtocol(protocol);
} }
/**
* 当前总链接
*
* @return 当前总链接
*/
@Override @Override
public long totalConnection() { public long totalConnection() {
return counter.sum(); return counter.sum();
} }
/**
* 传输协议
*
* @return {@link org.jetlinks.core.message.codec.DefaultTransport}
*/
@Override @Override
public Transport getTransport() { public Transport getTransport() {
return DefaultTransport.TCP; return DefaultTransport.TCP;
} }
/**
* 网络类型
*
* @return {@link org.jetlinks.community.network.DefaultNetworkType}
*/
@Override @Override
public NetworkType getNetworkType() { public NetworkType getNetworkType() {
return DefaultNetworkType.TCP_SERVER; return DefaultNetworkType.TCP_SERVER;
} }
/**
class TcpConnection { * 启动网关
final TcpClient client; */
final AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
final AtomicReference<DeviceSession> sessionRef = new AtomicReference<>();
final InetSocketAddress address;
TcpConnection(TcpClient client) {
this.client = client;
this.address = client.getRemoteAddress();
gatewayMonitor.totalConnection(counter.sum());
client.onDisconnect(() -> {
counter.decrement();
gatewayMonitor.disconnected();
gatewayMonitor.totalConnection(counter.sum());
});
gatewayMonitor.connected();
DeviceSession session = sessionManager.getSession(client.getId());
if (session == null) {
session = new UnknownTcpDeviceSession(client.getId(), client, getTransport()) {
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
}
@Override
public void setKeepAliveTimeout(Duration timeout) {
keepaliveTimeout.set(timeout);
}
@Override
public Optional<InetSocketAddress> getClientAddress() {
return Optional.of(address);
}
};
}
sessionRef.set(session);
}
Mono<Void> accept() {
return client
.subscribe()
.filter(tcp -> started.get())
.publishOn(Schedulers.parallel())
.flatMap(this::handleTcpMessage)
.onErrorResume((err) -> {
log.error(err.getMessage(), err);
client.shutdown();
return Mono.empty();
})
.then()
.doOnCancel(client::shutdown);
}
Mono<Void> handleTcpMessage(TcpMessage message) {
return getProtocol()
.flatMap(pt -> pt.getMessageCodec(getTransport()))
.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(sessionRef.get(), message, registry)))
.cast(DeviceMessage.class)
.doOnNext(msg -> gatewayMonitor.receivedMessage())
.flatMap(this::handleDeviceMessage)
.doOnEach(ReactiveLogger.onError(err -> log
.error("处理TCP[{}]消息失败:\n{}", address, message, err)))
.onErrorResume((err) -> Mono.fromRunnable(client::reset))
.then();
}
Mono<Void> handleDeviceMessage(DeviceMessage message) {
if (processor.hasDownstreams()) {
sink.next(message);
}
return helper
.handleDeviceMessage(message,
device -> new TcpDeviceSession(device, client, getTransport(), gatewayMonitor),
DeviceGatewayHelper
.applySessionKeepaliveTimeout(message, keepaliveTimeout::get)
.andThen(session -> {
TcpDeviceSession deviceSession = session.unwrap(TcpDeviceSession.class);
deviceSession.setClient(client);
sessionRef.set(deviceSession);
}),
() -> log.warn("无法从tcp[{}]消息中获取设备信息:{}", address, message)
)
.then();
}
}
private void doStart() { private void doStart() {
if (started.getAndSet(true) || disposable != null) { if (started.getAndSet(true) || disposable != null) {
return; return;
} }
// 从TCPServer中获取连接的client
// client实例化为TcpConnection之后处理消息
disposable = tcpServer disposable = tcpServer
.handleConnection() .handleConnection()
.publishOn(Schedulers.parallel()) .publishOn(Schedulers.parallel())
@@ -241,4 +180,134 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
public boolean isAlive() { public boolean isAlive() {
return started.get(); return started.get();
} }
/**
* TCP 客户端连接
*/
class TcpConnection implements DeviceGatewayContext {
final TcpClient client;
final AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
final AtomicReference<DeviceSession> sessionRef = new AtomicReference<>();
final InetSocketAddress address;
TcpConnection(TcpClient client) {
this.client = client;
this.address = client.getRemoteAddress();
gatewayMonitor.totalConnection(counter.sum());
client.onDisconnect(() -> {
counter.decrement();
gatewayMonitor.disconnected();
gatewayMonitor.totalConnection(counter.sum());
});
gatewayMonitor.connected();
DeviceSession session = sessionManager.getSession(client.getId());
if (session == null) {
session = new UnknownTcpDeviceSession(client.getId(), client, getTransport()) {
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
}
@Override
public void setKeepAliveTimeout(Duration timeout) {
keepaliveTimeout.set(timeout);
client.setKeepAliveTimeout(timeout);
}
@Override
public Optional<InetSocketAddress> getClientAddress() {
return Optional.of(address);
}
};
}
sessionRef.set(session);
}
/**
* 接收消息
*
* @return void
*/
Mono<Void> accept() {
return getProtocol()
.flatMap(protocol -> protocol.onClientConnect(getTransport(), client, this))
.then(
client
.subscribe()
.filter(tcp -> started.get())
.publishOn(Schedulers.parallel())
.flatMap(this::handleTcpMessage)
.onErrorResume((err) -> {
log.error(err.getMessage(), err);
client.shutdown();
return Mono.empty();
})
.then()
)
.doOnCancel(client::shutdown);
}
/**
* 处理TCP消息 ==>> 设备消息
*
* @param message tcp消息
* @return void
*/
Mono<Void> handleTcpMessage(TcpMessage message) {
return getProtocol()
.flatMap(pt -> pt.getMessageCodec(getTransport()))
.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(sessionRef.get(), message, registry)))
.cast(DeviceMessage.class)
.doOnNext(msg -> gatewayMonitor.receivedMessage())
.flatMap(this::handleDeviceMessage)
.doOnEach(ReactiveLogger.onError(err -> log.error("处理TCP[{}]消息失败:\n{}",
address,
message
, err)))
.onErrorResume((err) -> Mono.fromRunnable(client::reset))
.then();
}
/**
* 处理设备消息
*
* @param message 设备消息
* @return void
*/
Mono<Void> handleDeviceMessage(DeviceMessage message) {
if (processor.hasDownstreams()) {
sink.next(message);
}
return helper
.handleDeviceMessage(message,
device -> new TcpDeviceSession(device, client, getTransport(), gatewayMonitor),
DeviceGatewayHelper
.applySessionKeepaliveTimeout(message, keepaliveTimeout::get)
.andThen(session -> {
TcpDeviceSession deviceSession = session.unwrap(TcpDeviceSession.class);
deviceSession.setClient(client);
sessionRef.set(deviceSession);
}),
() -> log.warn("无法从tcp[{}]消息中获取设备信息:{}", address, message)
)
.then();
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
@Override
public Mono<DeviceProductOperator> getProduct(String productId) {
return registry.getProduct(productId);
}
@Override
public Mono<Void> onMessage(DeviceMessage message) {
return handleDeviceMessage(message);
}
}
} }

View File

@@ -1,8 +1,5 @@
package org.jetlinks.community.network.tcp.device; package org.jetlinks.community.network.tcp.device;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.community.gateway.DeviceGateway; import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.gateway.supports.DeviceGatewayProperties; import org.jetlinks.community.gateway.supports.DeviceGatewayProperties;
import org.jetlinks.community.gateway.supports.DeviceGatewayProvider; import org.jetlinks.community.gateway.supports.DeviceGatewayProvider;
@@ -10,11 +7,20 @@ import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkManager; import org.jetlinks.community.network.NetworkManager;
import org.jetlinks.community.network.NetworkType; import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.tcp.server.TcpServer; import org.jetlinks.community.network.tcp.server.TcpServer;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.supports.server.DecodedClientMessageHandler; import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
/**
* TCP服务设备网关提供商
*
* @author zhouhao
* @since 1.0
*/
@Component @Component
public class TcpServerDeviceGatewayProvider implements DeviceGatewayProvider { public class TcpServerDeviceGatewayProvider implements DeviceGatewayProvider {
@@ -60,18 +66,18 @@ public class TcpServerDeviceGatewayProvider implements DeviceGatewayProvider {
public Mono<DeviceGateway> createDeviceGateway(DeviceGatewayProperties properties) { public Mono<DeviceGateway> createDeviceGateway(DeviceGatewayProperties properties) {
return networkManager return networkManager
.<TcpServer>getNetwork(getNetworkType(), properties.getNetworkId()) .<TcpServer>getNetwork(getNetworkType(), properties.getNetworkId())
.map(mqttServer -> { .map(server -> {
String protocol = (String) properties.getConfiguration().get("protocol"); String protocol = (String) properties.getConfiguration().get("protocol");
Assert.hasText(protocol,"protocol can not be empty"); Assert.hasText(protocol, "protocol can not be empty");
return new TcpServerDeviceGateway(properties.getId(), return new TcpServerDeviceGateway(properties.getId(),
protocol, protocol,
protocolSupports, protocolSupports,
registry, registry,
messageHandler, messageHandler,
sessionManager, sessionManager,
mqttServer server
); );
}); });
} }

View File

@@ -1,12 +1,12 @@
package org.jetlinks.community.network.tcp.device; package org.jetlinks.community.network.tcp.device;
import lombok.Getter; import lombok.Getter;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.client.TcpClient;
import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.codec.EncodedMessage; import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.Transport; import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.client.TcpClient;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
class UnknownTcpDeviceSession implements DeviceSession { class UnknownTcpDeviceSession implements DeviceSession {

View File

@@ -1,7 +1,6 @@
package org.jetlinks.community.network.tcp.parser; package org.jetlinks.community.network.tcp.parser;
import org.jetlinks.community.ValueObject; import org.jetlinks.community.ValueObject;
import org.jetlinks.core.Values;
import org.jetlinks.community.network.tcp.parser.strateies.DelimitedPayloadParserBuilder; import org.jetlinks.community.network.tcp.parser.strateies.DelimitedPayloadParserBuilder;
import org.jetlinks.community.network.tcp.parser.strateies.DirectPayloadParserBuilder; import org.jetlinks.community.network.tcp.parser.strateies.DirectPayloadParserBuilder;
import org.jetlinks.community.network.tcp.parser.strateies.FixLengthPayloadParserBuilder; import org.jetlinks.community.network.tcp.parser.strateies.FixLengthPayloadParserBuilder;

View File

@@ -1,7 +1,6 @@
package org.jetlinks.community.network.tcp.parser; package org.jetlinks.community.network.tcp.parser;
import org.jetlinks.community.ValueObject; import org.jetlinks.community.ValueObject;
import org.jetlinks.core.Values;
public interface PayloadParserBuilder { public interface PayloadParserBuilder {

View File

@@ -3,8 +3,6 @@ package org.jetlinks.community.network.tcp.parser.strateies;
import io.vertx.core.parsetools.RecordParser; import io.vertx.core.parsetools.RecordParser;
import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.lang.StringEscapeUtils;
import org.jetlinks.community.ValueObject; import org.jetlinks.community.ValueObject;
import org.jetlinks.core.Value;
import org.jetlinks.core.Values;
import org.jetlinks.community.network.tcp.parser.PayloadParserType; import org.jetlinks.community.network.tcp.parser.PayloadParserType;
public class DelimitedPayloadParserBuilder extends VertxPayloadParserBuilder { public class DelimitedPayloadParserBuilder extends VertxPayloadParserBuilder {

View File

@@ -2,7 +2,6 @@ package org.jetlinks.community.network.tcp.parser.strateies;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import org.jetlinks.community.ValueObject; import org.jetlinks.community.ValueObject;
import org.jetlinks.core.Values;
import org.jetlinks.community.network.tcp.parser.DirectRecordParser; import org.jetlinks.community.network.tcp.parser.DirectRecordParser;
import org.jetlinks.community.network.tcp.parser.PayloadParser; import org.jetlinks.community.network.tcp.parser.PayloadParser;
import org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy; import org.jetlinks.community.network.tcp.parser.PayloadParserBuilderStrategy;

View File

@@ -2,8 +2,6 @@ package org.jetlinks.community.network.tcp.parser.strateies;
import io.vertx.core.parsetools.RecordParser; import io.vertx.core.parsetools.RecordParser;
import org.jetlinks.community.ValueObject; import org.jetlinks.community.ValueObject;
import org.jetlinks.core.Value;
import org.jetlinks.core.Values;
import org.jetlinks.community.network.tcp.parser.PayloadParserType; import org.jetlinks.community.network.tcp.parser.PayloadParserType;
public class FixLengthPayloadParserBuilder extends VertxPayloadParserBuilder { public class FixLengthPayloadParserBuilder extends VertxPayloadParserBuilder {

View File

@@ -44,6 +44,10 @@ public class PipePayloadParser implements PayloadParser {
private final AtomicInteger currentPipe = new AtomicInteger(); private final AtomicInteger currentPipe = new AtomicInteger();
public Buffer newBuffer() {
return Buffer.buffer();
}
public PipePayloadParser result(String buffer) { public PipePayloadParser result(String buffer) {
return result(Buffer.buffer(buffer)); return result(Buffer.buffer(buffer));
} }

Some files were not shown because too many files have changed in this diff Show More