diff --git a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/LazyInitManagementProtocolSupports.java b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/LazyInitManagementProtocolSupports.java index ccfdd469..b971201a 100644 --- a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/LazyInitManagementProtocolSupports.java +++ b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/LazyInitManagementProtocolSupports.java @@ -1,11 +1,85 @@ package org.jetlinks.community.standalone.configuration; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.jetlinks.core.ProtocolSupport; +import org.jetlinks.core.cluster.ClusterManager; +import org.jetlinks.supports.protocol.StaticProtocolSupports; import org.jetlinks.supports.protocol.management.ManagementProtocolSupports; +import org.jetlinks.supports.protocol.management.ProtocolSupportDefinition; +import org.jetlinks.supports.protocol.management.ProtocolSupportLoader; +import org.jetlinks.supports.protocol.management.ProtocolSupportManager; import org.springframework.boot.CommandLineRunner; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +@Slf4j +@Getter +@Setter +public class LazyInitManagementProtocolSupports extends StaticProtocolSupports implements CommandLineRunner { + + private ProtocolSupportManager manager; + + private ProtocolSupportLoader loader; + + private ClusterManager clusterManager; + + @Setter(AccessLevel.PRIVATE) + private Map configProtocolIdMapping = new ConcurrentHashMap<>(); + + private Duration loadTimeOut = Duration.ofSeconds(30); + + public void init() { + + clusterManager.getTopic("_protocol_changed") + .subscribe() + .subscribe(protocol -> this.init(protocol).subscribe()); + + try { + manager.loadAll() + .filter(de -> de.getState() == 1) + .flatMap(this::init) + .blockLast(loadTimeOut); + } catch (Exception e) { + log.error("load protocol error", e); + } + + } + + public Mono init(ProtocolSupportDefinition definition) { + if (definition.getState() != 1) { + String protocol = configProtocolIdMapping.get(definition.getId()); + if (protocol != null) { + log.debug("uninstall protocol:{}", definition); + unRegister(protocol); + return Mono.empty(); + } + } + String operation = definition.getState() != 1 ? "uninstall" : "install"; + Consumer consumer = definition.getState() != 1 ? this::unRegister : this::register; + + log.debug("{} protocol:{}", operation, definition); + + return loader + .load(definition) + .doOnNext(e -> { + log.debug("{} protocol[{}] success: {}", operation, definition.getId(), e); + configProtocolIdMapping.put(definition.getId(), e.getId()); + consumer.accept(e); + }) + .onErrorContinue((e, obj) -> log.error("{} protocol[{}] error: {}", operation, definition.getId(), e)) + .then(); + + } -public class LazyInitManagementProtocolSupports extends ManagementProtocolSupports implements CommandLineRunner { @Override - public void run(String... args) throws Exception { + public void run(String... args) { init(); } -} +} \ No newline at end of file diff --git a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/protocol/AutoDownloadJarProtocolSupportLoader.java b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/protocol/AutoDownloadJarProtocolSupportLoader.java index 45c9fa24..8eb47a3b 100644 --- a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/protocol/AutoDownloadJarProtocolSupportLoader.java +++ b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/protocol/AutoDownloadJarProtocolSupportLoader.java @@ -1,6 +1,7 @@ package org.jetlinks.community.standalone.configuration.protocol; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.digest.DigestUtils; import org.hswebframework.web.bean.FastBeanCopier; import org.jetlinks.core.ProtocolSupport; import org.jetlinks.core.spi.ServiceContext; @@ -31,6 +32,7 @@ import java.util.concurrent.TimeoutException; @Slf4j public class AutoDownloadJarProtocolSupportLoader extends JarProtocolSupportLoader { + final WebClient webClient; final File tempPath; @@ -39,7 +41,7 @@ public class AutoDownloadJarProtocolSupportLoader extends JarProtocolSupportLoad public AutoDownloadJarProtocolSupportLoader(WebClient.Builder builder) { this.webClient = builder.build(); - tempPath = new File(".temp"); + tempPath = new File("./data/protocols"); tempPath.mkdir(); } @@ -58,30 +60,38 @@ public class AutoDownloadJarProtocolSupportLoader extends JarProtocolSupportLoad @Override protected void closeLoader(ProtocolClassLoader loader) { super.closeLoader(loader); - for (URL url : loader.getUrls()) { - if (new File(url.getFile()).delete()) { - log.debug("delete old protocol:{}", url); - } - } +// for (URL url : loader.getUrls()) { +// if (new File(url.getFile()).delete()) { +// log.debug("delete old protocol:{}", url); +// } +// } } @Override public Mono load(ProtocolSupportDefinition definition) { - ProtocolSupportDefinition newDef = FastBeanCopier.copy(definition,new ProtocolSupportDefinition()); + ProtocolSupportDefinition newDef = FastBeanCopier.copy(definition, new ProtocolSupportDefinition()); - Map config =newDef.getConfiguration(); + Map config = newDef.getConfiguration(); String location = Optional.ofNullable(config.get("location")) .map(String::valueOf).orElseThrow(() -> new IllegalArgumentException("location")); if (location.startsWith("http")) { + String urlMd5 = DigestUtils.md5Hex(location); + //地址没变则直接加载本地文件 + File file = new File(tempPath, (newDef.getId() + "_" + urlMd5) + ".jar"); + if (file.exists()) { + config.put("location", file.getAbsolutePath()); + return super + .load(newDef) + .subscribeOn(Schedulers.elastic()); + } return webClient.get() .uri(location) .exchange() .flatMap(clientResponse -> clientResponse.bodyToMono(Resource.class)) .flatMap(resource -> Mono.fromCallable(resource::getInputStream)) .flatMap(stream -> Mono.fromCallable(() -> { - File file = new File(tempPath, (newDef.getId() + "_" + System.currentTimeMillis()) + ".jar"); log.debug("write protocol file {} to {}", location, file.getAbsolutePath()); try (InputStream input = stream; OutputStream out = new FileOutputStream(file)) {