优化集群

This commit is contained in:
zhouhao 2022-05-17 17:51:39 +08:00
parent 33864d544c
commit 890fe14b31
3 changed files with 19 additions and 46 deletions

View File

@ -20,6 +20,7 @@ import org.jetlinks.supports.scalecube.ExtendedCluster;
import org.jetlinks.supports.scalecube.ExtendedClusterImpl;
import org.jetlinks.supports.scalecube.ExtendedServiceDiscoveryImpl;
import org.jetlinks.supports.scalecube.event.ScalecubeEventBusBroker;
import org.jetlinks.supports.scalecube.rpc.ScalecubeRpcManager;
import org.nustaq.serialization.FSTConfiguration;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
@ -33,7 +34,7 @@ import reactor.core.scheduler.Schedulers;
import java.util.stream.Collectors;
@Configuration
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(ClusterProperties.class)
@ConditionalOnClass(ExtendedCluster.class)
public class ClusterConfiguration {
@ -104,27 +105,14 @@ public class ClusterConfiguration {
return new RedisClusterManager(properties.getName(), properties.getId(), template);
}
@Bean
public Microservices microservices(ExtendedCluster cluster,
ObjectProvider<ServiceInfo> infos,
ObjectProvider<ServiceProvider> providers,
ClusterProperties properties) {
return Microservices
.builder()
.services(infos.stream().toArray())
.services(call -> providers
.stream()
.flatMap(provider -> provider.provide(call).stream())
.collect(Collectors.toList()))
.serviceRegistry(new DynamicServiceRegistry())
.discovery(serviceEndpoint -> new ExtendedServiceDiscoveryImpl(cluster, serviceEndpoint))
@Bean(initMethod = "startAwait", destroyMethod = "stopAwait")
public ScalecubeRpcManager rpcManager(ExtendedCluster cluster, ClusterProperties properties) {
return new ScalecubeRpcManager(cluster,
() -> new RSocketServiceTransport()
.serverTransportFactory(RSocketServerTransportFactory.tcp(properties.getRpcPort()))
.clientTransportFactory(RSocketClientTransportFactory.tcp()))
.externalHost(properties.getRpcExternalHost())
.externalPort(properties.getRpcExternalPort())
.transport(() -> new RSocketServiceTransport()
.serverTransportFactory(RSocketServerTransportFactory.tcp(properties.getRpcPort()))
.clientTransportFactory(RSocketClientTransportFactory.tcp())
)
.startAwait();
.externalPort(properties.getRpcExternalPort());
}
}

View File

@ -2,12 +2,8 @@ package org.jetlinks.community.configure.device;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.guava.CaffeinatedGuava;
import io.scalecube.services.Microservices;
import io.scalecube.services.ServiceInfo;
import io.vavr.Lazy;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.web.crud.annotation.EnableEasyormRepository;
import org.jetlinks.community.configure.cluster.ClusterProperties;
import org.jetlinks.community.micrometer.MeterRegistryManager;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.cluster.ClusterManager;
@ -17,21 +13,20 @@ import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.DeviceStateChecker;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.supports.cluster.ClusterDeviceOperationBroker;
import org.jetlinks.supports.cluster.ClusterDeviceRegistry;
import org.jetlinks.supports.device.session.MicroserviceDeviceSessionManager;
import org.jetlinks.supports.scalecube.ExtendedCluster;
import org.jetlinks.supports.server.ClusterSendToDeviceMessageHandler;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Configuration(proxyBeanMethods = false)
@EnableEasyormRepository("org.jetlinks.community.configure.device.PersistentSessionEntity")
@ConditionalOnBean(ProtocolSupports.class)
public class DeviceClusterConfiguration {
@ -68,19 +63,11 @@ public class DeviceClusterConfiguration {
}
@Bean(initMethod = "init", destroyMethod = "shutdown")
@ConditionalOnBean(Microservices.class)
public PersistenceDeviceSessionManager deviceSessionManager(ExtendedCluster cluster,
Microservices microservices,
@ConditionalOnBean(RpcManager.class)
public PersistenceDeviceSessionManager deviceSessionManager(RpcManager rpcManager,
ReactiveRepository<PersistentSessionEntity, String> repository) {
return new PersistenceDeviceSessionManager(cluster, microservices.call(),repository);
}
@Bean
public ServiceInfo sessionManagerServiceInfo(ClusterProperties properties, ApplicationContext context) {
return MicroserviceDeviceSessionManager
.createService(properties.getId(),
Lazy.of(() -> context.getBean(MicroserviceDeviceSessionManager.class)));
return new PersistenceDeviceSessionManager(rpcManager,repository);
}
@ConditionalOnBean(DecodedClientMessageHandler.class)

View File

@ -1,14 +1,13 @@
package org.jetlinks.community.configure.device;
import io.scalecube.services.ServiceCall;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.session.DeviceSessionEvent;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.PersistentSession;
import org.jetlinks.supports.device.session.MicroserviceDeviceSessionManager;
import org.jetlinks.supports.scalecube.ExtendedCluster;
import org.jetlinks.supports.device.session.ClusterDeviceSessionManager;
import org.springframework.beans.BeansException;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
@ -22,15 +21,14 @@ import java.util.function.Function;
import java.util.function.Supplier;
@Slf4j
public class PersistenceDeviceSessionManager extends MicroserviceDeviceSessionManager implements CommandLineRunner, ApplicationContextAware {
public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager implements CommandLineRunner, ApplicationContextAware {
private Supplier<DeviceRegistry> registry;
private final ReactiveRepository<PersistentSessionEntity, String> repository;
public PersistenceDeviceSessionManager(ExtendedCluster cluster,
ServiceCall serviceCall,
public PersistenceDeviceSessionManager(RpcManager rpcManager,
ReactiveRepository<PersistentSessionEntity, String> repository) {
super(cluster, serviceCall);
super(rpcManager);
this.repository = repository;
}