From 25b949c395e36d53ccdbc22e39979f2c1b81ef30 Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Mon, 6 Jul 2020 15:40:55 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=8D=8F=E8=AE=AE=E5=8A=A0?= =?UTF-8?q?=E8=BD=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../configuration/JetLinksConfiguration.java | 8 -- .../AutoDownloadJarProtocolSupportLoader.java | 93 +++++++++++++++++++ 2 files changed, 93 insertions(+), 8 deletions(-) create mode 100644 jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/protocol/AutoDownloadJarProtocolSupportLoader.java diff --git a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java index 2ce76625..493c0298 100644 --- a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java +++ b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java @@ -243,14 +243,6 @@ public class JetLinksConfiguration { return new ClusterProtocolSupportManager(clusterManager); } - @Bean - public JarProtocolSupportLoader jarProtocolSupportLoader(ServiceContext serviceContext) { - JarProtocolSupportLoader loader = new JarProtocolSupportLoader(); - loader.setServiceContext(serviceContext); - return loader; - } - - @Bean public LazyInitManagementProtocolSupports managementProtocolSupports(ProtocolSupportManager supportManager, ProtocolSupportLoader loader, diff --git a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/protocol/AutoDownloadJarProtocolSupportLoader.java b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/protocol/AutoDownloadJarProtocolSupportLoader.java new file mode 100644 index 00000000..fde5344f --- /dev/null +++ b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/protocol/AutoDownloadJarProtocolSupportLoader.java @@ -0,0 +1,93 @@ +package org.jetlinks.community.standalone.configuration.protocol; + +import lombok.extern.slf4j.Slf4j; +import org.hswebframework.web.bean.FastBeanCopier; +import org.jetlinks.core.ProtocolSupport; +import org.jetlinks.core.spi.ServiceContext; +import org.jetlinks.community.utils.TimeUtils; +import org.jetlinks.supports.protocol.management.ProtocolSupportDefinition; +import org.jetlinks.supports.protocol.management.jar.JarProtocolSupportLoader; +import org.jetlinks.supports.protocol.management.jar.ProtocolClassLoader; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.io.Resource; +import org.springframework.stereotype.Component; +import org.springframework.util.StreamUtils; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeoutException; + +@Component +@Slf4j +public class AutoDownloadJarProtocolSupportLoader extends JarProtocolSupportLoader { + + final WebClient webClient; + + final File tempPath; + + private final Duration loadTimeout = TimeUtils.parse(System.getProperty("jetlinks.protocol.load.timeout", "10s")); + + public AutoDownloadJarProtocolSupportLoader(WebClient.Builder builder) { + this.webClient = builder.build(); + tempPath = new File(".temp"); + tempPath.mkdir(); + } + + @Override + @Autowired + public void setServiceContext(ServiceContext serviceContext) { + super.setServiceContext(serviceContext); + } + + @Override + protected void closeLoader(ProtocolClassLoader loader) { + super.closeLoader(loader); + for (URL url : loader.getUrls()) { + if (new File(url.getFile()).delete()) { + log.debug("delete old protocol:{}", url); + } + } + } + + @Override + public Mono load(ProtocolSupportDefinition definition) { + + ProtocolSupportDefinition newDef = FastBeanCopier.copy(definition,new ProtocolSupportDefinition()); + + Map config =newDef.getConfiguration(); + String location = Optional.ofNullable(config.get("location")) + .map(String::valueOf).orElseThrow(() -> new IllegalArgumentException("location")); + + if (location.startsWith("http")) { + return webClient.get() + .uri(location) + .exchange() + .flatMap(clientResponse -> clientResponse.bodyToMono(Resource.class)) + .flatMap(resource -> Mono.fromCallable(resource::getInputStream)) + .flatMap(stream -> Mono.fromCallable(() -> { + File file = new File(tempPath, (newDef.getId() + "_" + System.currentTimeMillis()) + ".jar"); + log.debug("write protocol file {} to {}", location, file.getAbsolutePath()); + try (InputStream input = stream; + OutputStream out = new FileOutputStream(file)) { + StreamUtils.copy(input, out); + } + return file.getAbsolutePath(); + })) + .subscribeOn(Schedulers.elastic()) + .doOnNext(path -> config.put("location", path)) + .then(super.load(newDef)) + .timeout(loadTimeout, Mono.error(() -> new TimeoutException("获取协议文件失败:" + location))) + ; + } + return super.load(newDef); + } +}