This commit is contained in:
zhouhao 2022-06-14 17:51:43 +08:00
parent 5f871ebd13
commit cabc095367
7 changed files with 78 additions and 103 deletions

View File

@ -90,36 +90,36 @@
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-logging</artifactId>
<version>1.12.0</version>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<version>1.12.0</version>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>1.12.0</version>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-jaeger</artifactId>
<version>1.12.0</version>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.45.0</version>
<version>1.47.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.45.0</version>
<version>1.47.0</version>
</dependency>
<dependency>
@ -133,6 +133,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -2,9 +2,7 @@ package org.jetlinks.community.configure.device;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.guava.CaffeinatedGuava;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.web.crud.annotation.EnableEasyormRepository;
import org.jetlinks.community.micrometer.MeterRegistryManager;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.config.ConfigStorageManager;
@ -27,7 +25,6 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration(proxyBeanMethods = false)
@EnableEasyormRepository("org.jetlinks.community.configure.device.PersistentSessionEntity")
@ConditionalOnBean(ProtocolSupports.class)
public class DeviceClusterConfiguration {
@ -64,10 +61,9 @@ public class DeviceClusterConfiguration {
@Bean(initMethod = "init", destroyMethod = "shutdown")
@ConditionalOnBean(RpcManager.class)
public PersistenceDeviceSessionManager deviceSessionManager(RpcManager rpcManager,
ReactiveRepository<PersistentSessionEntity, String> repository) {
public PersistenceDeviceSessionManager deviceSessionManager(RpcManager rpcManager) {
return new PersistenceDeviceSessionManager(rpcManager,repository);
return new PersistenceDeviceSessionManager(rpcManager);
}
@ConditionalOnBean(DecodedClientMessageHandler.class)
@ -86,11 +82,4 @@ public class DeviceClusterConfiguration {
}
@Bean(initMethod = "init")
public DeviceSessionMonitor deviceSessionMonitor(DeviceSessionManager sessionManager,
MeterRegistryManager registryManager){
return new DeviceSessionMonitor(registryManager,sessionManager,"gateway-server-session");
}
}

View File

@ -1,41 +0,0 @@
package org.jetlinks.community.configure.device;
import io.micrometer.core.instrument.Gauge;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import org.jetlinks.community.micrometer.MeterRegistryManager;
import org.jetlinks.core.device.session.DeviceSessionManager;
import reactor.core.publisher.Mono;
import java.util.concurrent.Callable;
@AllArgsConstructor
public class DeviceSessionMonitor {
private MeterRegistryManager registryManager;
private DeviceSessionManager sessionManager;
private String name;
public void init() {
Gauge.builder(name, this::getTotalSession)
.tag("server", sessionManager.getCurrentServerId())
.register(registryManager.getMeterRegister("device_metrics"));
}
@SneakyThrows
@SuppressWarnings("all")
private long getTotalSession() {
Mono<Long> session = sessionManager.totalSessions(true);
Long val = null;
if (session instanceof Callable) {
val = ((Callable<Long>) session).call();
} else {
val = session
.toFuture()
.getNow(0L);
}
return val == null ? 0 : val;
}
}

View File

@ -1,12 +1,17 @@
package org.jetlinks.community.configure.device;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.MVStoreException;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.session.DeviceSessionEvent;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.PersistentSession;
import org.jetlinks.community.configure.cluster.Cluster;
import org.jetlinks.supports.device.session.ClusterDeviceSessionManager;
import org.springframework.beans.BeansException;
import org.springframework.boot.CommandLineRunner;
@ -17,6 +22,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.io.File;
import java.util.function.Function;
import java.util.function.Supplier;
@ -24,18 +30,46 @@ import java.util.function.Supplier;
public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager implements CommandLineRunner, ApplicationContextAware {
private Supplier<DeviceRegistry> registry;
private final ReactiveRepository<PersistentSessionEntity, String> repository;
private MVMap<String, PersistentSessionEntity> repository;
public PersistenceDeviceSessionManager(RpcManager rpcManager,
ReactiveRepository<PersistentSessionEntity, String> repository) {
@Getter
@Setter
private String filePath;
public PersistenceDeviceSessionManager(RpcManager rpcManager) {
super(rpcManager);
this.repository = repository;
}
static MVMap<String, PersistentSessionEntity> initStore(String file) {
File f = new File(file);
if (!f.getParentFile().exists()) {
f.getParentFile().mkdirs();
}
Supplier<MVMap<String, PersistentSessionEntity>>
builder = () -> {
MVStore store = new MVStore.Builder()
.fileName(file)
.cacheSize(1)
.open();
return store.openMap("device-session");
};
try {
return builder.get();
} catch (MVStoreException e) {
log.warn("load session from {} error,delete it and init.", file, e);
f.delete();
return builder.get();
}
}
@Override
public void init() {
super.init();
if (filePath == null) {
filePath = "./data/sessions-" + Cluster.id();
}
repository = initStore(filePath);
disposable.add(
listenEvent(event -> {
//移除持久化的会话
@ -59,6 +93,7 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
.map(session -> session.unwrap(PersistentSession.class))
.as(this::tryPersistent)
.block();
repository.store.close();
}
@Override
@ -68,7 +103,6 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
}
if ((old == null || !old.isWrapFrom(PersistentSession.class))
&& newSession.isWrapFrom(PersistentSession.class)) {
//todo 批量处理?
return this
.tryPersistent(Flux.just(newSession.unwrap(PersistentSession.class)))
.thenReturn(newSession);
@ -81,7 +115,10 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
return sessions
.flatMap(session -> PersistentSessionEntity.from(getCurrentServerId(), session, registry.get()))
.distinct(PersistentSessionEntity::getId)
.as(repository::save)
.doOnNext(e -> {
log.debug("persistent device[{}] session", e.getDeviceId());
repository.put(e.getDeviceId(), e);
})
.onErrorResume(err -> {
log.warn("persistent session error", err);
return Mono.empty();
@ -104,17 +141,14 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
}
Mono<Void> removePersistentSession(PersistentSession session) {
return repository
.deleteById(session.getId())
.then();
repository.remove(session.getId());
return Mono.empty();
}
@Override
public void run(String... args) throws Exception {
repository
.createQuery()
.where(PersistentSessionEntity::getServerId, getCurrentServerId())
.fetch()
Flux.fromIterable(repository.values())
.flatMap(this::resumeSession)
.subscribe();
}

View File

@ -6,7 +6,6 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.commons.codec.binary.Base64;
import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType;
import org.hswebframework.ezorm.rdb.mapping.annotation.Comment;
import org.hswebframework.web.api.crud.entity.GenericEntity;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.server.session.DeviceSessionProvider;
@ -15,16 +14,10 @@ import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
import javax.persistence.Column;
import javax.persistence.Index;
import javax.persistence.Table;
import java.sql.JDBCType;
@Getter
@Setter
@Table(name = "dev_sessions", indexes = @Index(
name = "idx_session_server", columnList = "server_id"
))
@Comment("设备会话信息表")
@Generated
public class PersistentSessionEntity extends GenericEntity<String> {

View File

@ -34,11 +34,6 @@
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>org.hswebframework</groupId>
<artifactId>hsweb-easy-orm-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>

32
pom.xml
View File

@ -16,26 +16,27 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.locales>zh_CN</project.build.locales>
<spring.boot.version>2.5.12</spring.boot.version>
<spring.boot.version>2.5.13</spring.boot.version>
<java.version>1.8</java.version>
<project.build.jdk>${java.version}</project.build.jdk>
<hsweb.framework.version>4.0.14-SNAPSHOT</hsweb.framework.version>
<easyorm.version>4.0.14-SNAPSHOT</easyorm.version>
<easyorm.version>4.1.0-SNAPSHOT</easyorm.version>
<hsweb.expands.version>3.0.2</hsweb.expands.version>
<jetlinks.version>1.2.0-SNAPSHOT</jetlinks.version>
<r2dbc.version>Arabba-SR10</r2dbc.version>
<netty.version>4.1.73.Final</netty.version>
<r2dbc.version>Borca-SR1</r2dbc.version>
<netty.version>4.1.74.Final</netty.version>
<elasticsearch.version>7.11.2</elasticsearch.version>
<reactor.excel.version>1.0.3-SNAPSHOT</reactor.excel.version>
<reactor.ql.version>1.0.14-SNAPSHOT</reactor.ql.version>
<californium.version>3.3.1</californium.version>
<fastjson.version>1.2.83</fastjson.version>
<reactor.version>2020.0.6</reactor.version>
<vertx.version>4.2.3</vertx.version>
<reactor.version>2020.0.18</reactor.version>
<vertx.version>4.3.0</vertx.version>
<log4j.version>2.17.1</log4j.version>
<logback.version>1.2.9</logback.version>
<springdoc.version>1.6.6</springdoc.version>
<jackson.version>2.13.2.20220328</jackson.version>
<opentelemetry.version>1.13.0</opentelemetry.version>
</properties>
@ -161,6 +162,12 @@
<dependencies>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.1.210</version>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-common</artifactId>
@ -352,7 +359,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.0-jre</version>
<version>31.0.1-jre</version>
</dependency>
<dependency>
@ -397,13 +404,6 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.hswebframework</groupId>
<artifactId>hsweb-easy-orm-elasticsearch</artifactId>
<version>${easyorm.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
@ -448,14 +448,14 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.16.2</version>
<version>1.17.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.16.2</version>
<version>1.17.2</version>
<scope>test</scope>
</dependency>