diff --git a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/cluster/ClusterConfiguration.java b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/cluster/ClusterConfiguration.java index 3b371bfd..26350b19 100644 --- a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/cluster/ClusterConfiguration.java +++ b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/cluster/ClusterConfiguration.java @@ -20,6 +20,7 @@ import org.jetlinks.supports.scalecube.ExtendedCluster; import org.jetlinks.supports.scalecube.ExtendedClusterImpl; import org.jetlinks.supports.scalecube.ExtendedServiceDiscoveryImpl; import org.jetlinks.supports.scalecube.event.ScalecubeEventBusBroker; +import org.jetlinks.supports.scalecube.rpc.ScalecubeRpcManager; import org.nustaq.serialization.FSTConfiguration; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; @@ -33,7 +34,7 @@ import reactor.core.scheduler.Schedulers; import java.util.stream.Collectors; -@Configuration +@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties(ClusterProperties.class) @ConditionalOnClass(ExtendedCluster.class) public class ClusterConfiguration { @@ -104,27 +105,14 @@ public class ClusterConfiguration { return new RedisClusterManager(properties.getName(), properties.getId(), template); } - @Bean - public Microservices microservices(ExtendedCluster cluster, - ObjectProvider infos, - ObjectProvider providers, - ClusterProperties properties) { - return Microservices - .builder() - .services(infos.stream().toArray()) - .services(call -> providers - .stream() - .flatMap(provider -> provider.provide(call).stream()) - .collect(Collectors.toList())) - .serviceRegistry(new DynamicServiceRegistry()) - .discovery(serviceEndpoint -> new ExtendedServiceDiscoveryImpl(cluster, serviceEndpoint)) + @Bean(initMethod = "startAwait", destroyMethod = "stopAwait") + public ScalecubeRpcManager rpcManager(ExtendedCluster cluster, ClusterProperties properties) { + return new ScalecubeRpcManager(cluster, + () -> new RSocketServiceTransport() + .serverTransportFactory(RSocketServerTransportFactory.tcp(properties.getRpcPort())) + .clientTransportFactory(RSocketClientTransportFactory.tcp())) .externalHost(properties.getRpcExternalHost()) - .externalPort(properties.getRpcExternalPort()) - .transport(() -> new RSocketServiceTransport() - .serverTransportFactory(RSocketServerTransportFactory.tcp(properties.getRpcPort())) - .clientTransportFactory(RSocketClientTransportFactory.tcp()) - ) - .startAwait(); + .externalPort(properties.getRpcExternalPort()); } } diff --git a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/DeviceClusterConfiguration.java b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/DeviceClusterConfiguration.java index e2ca8352..dce9d51b 100644 --- a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/DeviceClusterConfiguration.java +++ b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/DeviceClusterConfiguration.java @@ -2,12 +2,8 @@ package org.jetlinks.community.configure.device; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.guava.CaffeinatedGuava; -import io.scalecube.services.Microservices; -import io.scalecube.services.ServiceInfo; -import io.vavr.Lazy; import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; import org.hswebframework.web.crud.annotation.EnableEasyormRepository; -import org.jetlinks.community.configure.cluster.ClusterProperties; import org.jetlinks.community.micrometer.MeterRegistryManager; import org.jetlinks.core.ProtocolSupports; import org.jetlinks.core.cluster.ClusterManager; @@ -17,21 +13,20 @@ import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.device.DeviceStateChecker; import org.jetlinks.core.device.session.DeviceSessionManager; import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor; +import org.jetlinks.core.rpc.RpcManager; import org.jetlinks.core.server.MessageHandler; import org.jetlinks.supports.cluster.ClusterDeviceOperationBroker; import org.jetlinks.supports.cluster.ClusterDeviceRegistry; -import org.jetlinks.supports.device.session.MicroserviceDeviceSessionManager; import org.jetlinks.supports.scalecube.ExtendedCluster; import org.jetlinks.supports.server.ClusterSendToDeviceMessageHandler; import org.jetlinks.supports.server.DecodedClientMessageHandler; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -@Configuration +@Configuration(proxyBeanMethods = false) @EnableEasyormRepository("org.jetlinks.community.configure.device.PersistentSessionEntity") @ConditionalOnBean(ProtocolSupports.class) public class DeviceClusterConfiguration { @@ -68,19 +63,11 @@ public class DeviceClusterConfiguration { } @Bean(initMethod = "init", destroyMethod = "shutdown") - @ConditionalOnBean(Microservices.class) - public PersistenceDeviceSessionManager deviceSessionManager(ExtendedCluster cluster, - Microservices microservices, + @ConditionalOnBean(RpcManager.class) + public PersistenceDeviceSessionManager deviceSessionManager(RpcManager rpcManager, ReactiveRepository repository) { - return new PersistenceDeviceSessionManager(cluster, microservices.call(),repository); - } - - @Bean - public ServiceInfo sessionManagerServiceInfo(ClusterProperties properties, ApplicationContext context) { - return MicroserviceDeviceSessionManager - .createService(properties.getId(), - Lazy.of(() -> context.getBean(MicroserviceDeviceSessionManager.class))); + return new PersistenceDeviceSessionManager(rpcManager,repository); } @ConditionalOnBean(DecodedClientMessageHandler.class) diff --git a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java index 741883d1..d9af6eff 100644 --- a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java +++ b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java @@ -1,14 +1,13 @@ package org.jetlinks.community.configure.device; -import io.scalecube.services.ServiceCall; import lombok.extern.slf4j.Slf4j; import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.device.session.DeviceSessionEvent; +import org.jetlinks.core.rpc.RpcManager; import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.server.session.PersistentSession; -import org.jetlinks.supports.device.session.MicroserviceDeviceSessionManager; -import org.jetlinks.supports.scalecube.ExtendedCluster; +import org.jetlinks.supports.device.session.ClusterDeviceSessionManager; import org.springframework.beans.BeansException; import org.springframework.boot.CommandLineRunner; import org.springframework.context.ApplicationContext; @@ -22,15 +21,14 @@ import java.util.function.Function; import java.util.function.Supplier; @Slf4j -public class PersistenceDeviceSessionManager extends MicroserviceDeviceSessionManager implements CommandLineRunner, ApplicationContextAware { +public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager implements CommandLineRunner, ApplicationContextAware { private Supplier registry; private final ReactiveRepository repository; - public PersistenceDeviceSessionManager(ExtendedCluster cluster, - ServiceCall serviceCall, + public PersistenceDeviceSessionManager(RpcManager rpcManager, ReactiveRepository repository) { - super(cluster, serviceCall); + super(rpcManager); this.repository = repository; }