diff --git a/jetlinks-components/configure-component/pom.xml b/jetlinks-components/configure-component/pom.xml index c3a4fa07..47c9b86e 100644 --- a/jetlinks-components/configure-component/pom.xml +++ b/jetlinks-components/configure-component/pom.xml @@ -90,36 +90,36 @@ io.opentelemetry opentelemetry-exporter-logging - 1.12.0 + 1.14.0 io.opentelemetry opentelemetry-sdk-trace - 1.12.0 + 1.14.0 io.opentelemetry opentelemetry-sdk - 1.12.0 + 1.14.0 io.opentelemetry opentelemetry-exporter-jaeger - 1.12.0 + 1.14.0 io.grpc grpc-protobuf - 1.45.0 + 1.47.0 io.grpc grpc-netty-shaded - 1.45.0 + 1.47.0 @@ -133,6 +133,11 @@ ${project.version} + + com.h2database + h2 + + \ No newline at end of file diff --git a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/DeviceClusterConfiguration.java b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/DeviceClusterConfiguration.java index dce9d51b..15d60f76 100644 --- a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/DeviceClusterConfiguration.java +++ b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/DeviceClusterConfiguration.java @@ -2,9 +2,7 @@ package org.jetlinks.community.configure.device; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.guava.CaffeinatedGuava; -import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; import org.hswebframework.web.crud.annotation.EnableEasyormRepository; -import org.jetlinks.community.micrometer.MeterRegistryManager; import org.jetlinks.core.ProtocolSupports; import org.jetlinks.core.cluster.ClusterManager; import org.jetlinks.core.config.ConfigStorageManager; @@ -27,7 +25,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration(proxyBeanMethods = false) -@EnableEasyormRepository("org.jetlinks.community.configure.device.PersistentSessionEntity") @ConditionalOnBean(ProtocolSupports.class) public class DeviceClusterConfiguration { @@ -64,10 +61,9 @@ public class DeviceClusterConfiguration { @Bean(initMethod = "init", destroyMethod = "shutdown") @ConditionalOnBean(RpcManager.class) - public PersistenceDeviceSessionManager deviceSessionManager(RpcManager rpcManager, - ReactiveRepository repository) { + public PersistenceDeviceSessionManager deviceSessionManager(RpcManager rpcManager) { - return new PersistenceDeviceSessionManager(rpcManager,repository); + return new PersistenceDeviceSessionManager(rpcManager); } @ConditionalOnBean(DecodedClientMessageHandler.class) @@ -86,11 +82,4 @@ public class DeviceClusterConfiguration { } - @Bean(initMethod = "init") - public DeviceSessionMonitor deviceSessionMonitor(DeviceSessionManager sessionManager, - MeterRegistryManager registryManager){ - - return new DeviceSessionMonitor(registryManager,sessionManager,"gateway-server-session"); - } - } diff --git a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/DeviceSessionMonitor.java b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/DeviceSessionMonitor.java deleted file mode 100644 index 6aa2475d..00000000 --- a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/DeviceSessionMonitor.java +++ /dev/null @@ -1,41 +0,0 @@ -package org.jetlinks.community.configure.device; - -import io.micrometer.core.instrument.Gauge; -import lombok.AllArgsConstructor; -import lombok.SneakyThrows; -import org.jetlinks.community.micrometer.MeterRegistryManager; -import org.jetlinks.core.device.session.DeviceSessionManager; -import reactor.core.publisher.Mono; - -import java.util.concurrent.Callable; - -@AllArgsConstructor -public class DeviceSessionMonitor { - - private MeterRegistryManager registryManager; - - private DeviceSessionManager sessionManager; - - private String name; - - public void init() { - Gauge.builder(name, this::getTotalSession) - .tag("server", sessionManager.getCurrentServerId()) - .register(registryManager.getMeterRegister("device_metrics")); - } - - @SneakyThrows - @SuppressWarnings("all") - private long getTotalSession() { - Mono session = sessionManager.totalSessions(true); - Long val = null; - if (session instanceof Callable) { - val = ((Callable) session).call(); - } else { - val = session - .toFuture() - .getNow(0L); - } - return val == null ? 0 : val; - } -} diff --git a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java index d9af6eff..641cb39b 100644 --- a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java +++ b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java @@ -1,12 +1,17 @@ package org.jetlinks.community.configure.device; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; +import org.h2.mvstore.MVMap; +import org.h2.mvstore.MVStore; +import org.h2.mvstore.MVStoreException; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.device.session.DeviceSessionEvent; import org.jetlinks.core.rpc.RpcManager; import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.server.session.PersistentSession; +import org.jetlinks.community.configure.cluster.Cluster; import org.jetlinks.supports.device.session.ClusterDeviceSessionManager; import org.springframework.beans.BeansException; import org.springframework.boot.CommandLineRunner; @@ -17,6 +22,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import javax.annotation.Nonnull; +import java.io.File; import java.util.function.Function; import java.util.function.Supplier; @@ -24,18 +30,46 @@ import java.util.function.Supplier; public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager implements CommandLineRunner, ApplicationContextAware { private Supplier registry; - private final ReactiveRepository repository; + private MVMap repository; - public PersistenceDeviceSessionManager(RpcManager rpcManager, - ReactiveRepository repository) { + @Getter + @Setter + private String filePath; + + public PersistenceDeviceSessionManager(RpcManager rpcManager) { super(rpcManager); - this.repository = repository; + } + + static MVMap initStore(String file) { + File f = new File(file); + if (!f.getParentFile().exists()) { + f.getParentFile().mkdirs(); + } + Supplier> + builder = () -> { + MVStore store = new MVStore.Builder() + .fileName(file) + .cacheSize(1) + .open(); + return store.openMap("device-session"); + }; + try { + return builder.get(); + } catch (MVStoreException e) { + log.warn("load session from {} error,delete it and init.", file, e); + f.delete(); + return builder.get(); + } } @Override public void init() { - super.init(); + if (filePath == null) { + filePath = "./data/sessions-" + Cluster.id(); + } + repository = initStore(filePath); + disposable.add( listenEvent(event -> { //移除持久化的会话 @@ -59,6 +93,7 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager .map(session -> session.unwrap(PersistentSession.class)) .as(this::tryPersistent) .block(); + repository.store.close(); } @Override @@ -68,7 +103,6 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager } if ((old == null || !old.isWrapFrom(PersistentSession.class)) && newSession.isWrapFrom(PersistentSession.class)) { - //todo 批量处理? return this .tryPersistent(Flux.just(newSession.unwrap(PersistentSession.class))) .thenReturn(newSession); @@ -81,7 +115,10 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager return sessions .flatMap(session -> PersistentSessionEntity.from(getCurrentServerId(), session, registry.get())) .distinct(PersistentSessionEntity::getId) - .as(repository::save) + .doOnNext(e -> { + log.debug("persistent device[{}] session", e.getDeviceId()); + repository.put(e.getDeviceId(), e); + }) .onErrorResume(err -> { log.warn("persistent session error", err); return Mono.empty(); @@ -104,17 +141,14 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager } Mono removePersistentSession(PersistentSession session) { - return repository - .deleteById(session.getId()) - .then(); + repository.remove(session.getId()); + return Mono.empty(); } @Override public void run(String... args) throws Exception { - repository - .createQuery() - .where(PersistentSessionEntity::getServerId, getCurrentServerId()) - .fetch() + + Flux.fromIterable(repository.values()) .flatMap(this::resumeSession) .subscribe(); } diff --git a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistentSessionEntity.java b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistentSessionEntity.java index 73e7e830..e33b12ac 100644 --- a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistentSessionEntity.java +++ b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistentSessionEntity.java @@ -6,7 +6,6 @@ import lombok.Getter; import lombok.Setter; import org.apache.commons.codec.binary.Base64; import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType; -import org.hswebframework.ezorm.rdb.mapping.annotation.Comment; import org.hswebframework.web.api.crud.entity.GenericEntity; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.server.session.DeviceSessionProvider; @@ -15,16 +14,10 @@ import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; import javax.persistence.Column; -import javax.persistence.Index; -import javax.persistence.Table; import java.sql.JDBCType; @Getter @Setter -@Table(name = "dev_sessions", indexes = @Index( - name = "idx_session_server", columnList = "server_id" -)) -@Comment("设备会话信息表") @Generated public class PersistentSessionEntity extends GenericEntity { diff --git a/jetlinks-components/elasticsearch-component/pom.xml b/jetlinks-components/elasticsearch-component/pom.xml index 17085600..a619d3be 100644 --- a/jetlinks-components/elasticsearch-component/pom.xml +++ b/jetlinks-components/elasticsearch-component/pom.xml @@ -34,11 +34,6 @@ reactor-core - - org.hswebframework - hsweb-easy-orm-elasticsearch - - org.elasticsearch.client elasticsearch-rest-high-level-client diff --git a/pom.xml b/pom.xml index dda4d25c..504f842d 100644 --- a/pom.xml +++ b/pom.xml @@ -16,26 +16,27 @@ UTF-8 zh_CN - 2.5.12 + 2.5.13 1.8 ${java.version} 4.0.14-SNAPSHOT - 4.0.14-SNAPSHOT + 4.1.0-SNAPSHOT 3.0.2 1.2.0-SNAPSHOT - Arabba-SR10 - 4.1.73.Final + Borca-SR1 + 4.1.74.Final 7.11.2 1.0.3-SNAPSHOT 1.0.14-SNAPSHOT 3.3.1 1.2.83 - 2020.0.6 - 4.2.3 + 2020.0.18 + 4.3.0 2.17.1 1.2.9 1.6.6 2.13.2.20220328 + 1.13.0 @@ -161,6 +162,12 @@ + + com.h2database + h2 + 2.1.210 + + org.springdoc springdoc-openapi-common @@ -352,7 +359,7 @@ com.google.guava guava - 28.0-jre + 31.0.1-jre @@ -397,13 +404,6 @@ - - org.hswebframework - hsweb-easy-orm-elasticsearch - ${easyorm.version} - - - @@ -448,14 +448,14 @@ org.testcontainers testcontainers - 1.16.2 + 1.17.2 test org.testcontainers junit-jupiter - 1.16.2 + 1.17.2 test