refactor: 使用新的协议加载逻辑. (#491)

使用eventbus来传递协议变更事件
This commit is contained in:
老周 2024-04-07 15:00:19 +08:00 committed by GitHub
parent cf2c781414
commit b61b0e3568
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 20 additions and 83 deletions

View File

@ -1,96 +1,37 @@
package org.jetlinks.community.protocol;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.supports.protocol.StaticProtocolSupports;
import org.jetlinks.supports.protocol.management.ProtocolSupportDefinition;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.supports.protocol.management.DefaultProtocolSupportManager;
import org.jetlinks.supports.protocol.management.ProtocolSupportLoader;
import org.jetlinks.supports.protocol.management.ProtocolSupportManager;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@Slf4j
@Getter
@Setter
@Order(Ordered.HIGHEST_PRECEDENCE)
public class LazyInitManagementProtocolSupports extends StaticProtocolSupports implements CommandLineRunner {
private ProtocolSupportManager manager;
private ProtocolSupportLoader loader;
private ClusterManager clusterManager;
@Setter(AccessLevel.PRIVATE)
private Map<String, String> configProtocolIdMapping = new ConcurrentHashMap<>();
private Duration loadTimeOut = Duration.ofSeconds(30);
public void init() {
clusterManager.<ProtocolSupportDefinition>getTopic("_protocol_changed")
.subscribe()
.subscribe(protocol -> this.init(protocol).subscribe());
try {
manager
.loadAll()
.filter(de -> de.getState() == 1)
.flatMap(this::init)
.blockLast(loadTimeOut);
} catch (Throwable e) {
log.error("load protocol error", e);
}
public class LazyInitManagementProtocolSupports extends DefaultProtocolSupportManager implements CommandLineRunner {
public LazyInitManagementProtocolSupports(EventBus eventBus,
ClusterManager clusterManager,
ProtocolSupportLoader loader) {
super(eventBus, clusterManager.getCache("__protocol_supports"), loader);
}
public Mono<Void> init(ProtocolSupportDefinition definition) {
try {
if (definition.getState() != 1) {
String protocol = configProtocolIdMapping.get(definition.getId());
if (protocol != null) {
log.debug("uninstall protocol:{}", definition);
unRegister(protocol);
return Mono.empty();
}
}
String operation = definition.getState() != 1 ? "uninstall" : "install";
Consumer<ProtocolSupport> consumer = definition.getState() != 1 ? this::unRegister : this::register;
log.debug("{} protocol:{}", operation, definition);
return loader
.load(definition)
.doOnNext(e -> {
e.init(definition.getConfiguration());
log.debug("{} protocol[{}] success: {}", operation, definition.getId(), e);
configProtocolIdMapping.put(definition.getId(), e.getId());
consumer.accept(e);
})
.onErrorResume((e) -> {
log.error("{} protocol[{}] error: {}", operation, definition.getId(), e.getLocalizedMessage());
return Mono.empty();
})
.then();
} catch (Throwable err) {
log.error("init protocol error", err);
}
return Mono.empty();
public void init() {
super.init();
}
@Override
public void run(String... args) {
init();
}
}

View File

@ -28,10 +28,10 @@ import org.springframework.web.reactive.function.client.WebClient;
@AutoConfigureBefore(DeviceClusterConfiguration.class)
public class ProtocolAutoConfiguration {
@Bean
public ProtocolSupportManager protocolSupportManager(ClusterManager clusterManager) {
return new ClusterProtocolSupportManager(clusterManager);
}
// @Bean
// public ProtocolSupportManager protocolSupportManager(ClusterManager clusterManager) {
// return new ClusterProtocolSupportManager(clusterManager);
// }
@Bean
public ServiceContext serviceContext(ApplicationContext applicationContext) {
@ -39,14 +39,10 @@ public class ProtocolAutoConfiguration {
}
@Bean
public LazyInitManagementProtocolSupports managementProtocolSupports(ProtocolSupportManager supportManager,
ProtocolSupportLoader loader,
ClusterManager clusterManager) {
LazyInitManagementProtocolSupports supports = new LazyInitManagementProtocolSupports();
supports.setClusterManager(clusterManager);
supports.setManager(supportManager);
supports.setLoader(loader);
return supports;
public LazyInitManagementProtocolSupports managementProtocolSupports(EventBus eventBus,
ClusterManager clusterManager,
ProtocolSupportLoader loader) {
return new LazyInitManagementProtocolSupports(eventBus, clusterManager, loader);
}
@Bean
@ -57,7 +53,7 @@ public class ProtocolAutoConfiguration {
@Bean
public AutoDownloadJarProtocolSupportLoader autoDownloadJarProtocolSupportLoader(WebClient.Builder builder, FileManager fileManager) {
return new AutoDownloadJarProtocolSupportLoader(builder,fileManager);
return new AutoDownloadJarProtocolSupportLoader(builder, fileManager);
}
@Bean