优化配置

This commit is contained in:
zhouhao 2022-09-26 17:28:55 +08:00
parent 279e21a5d3
commit 0242ce4188
14 changed files with 87 additions and 757 deletions

View File

@ -0,0 +1,67 @@
package org.jetlinks.community.configure.redis;
import io.netty.util.concurrent.FastThreadLocal;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.utils.SerializeUtils;
import org.jetlinks.community.codec.Serializers;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;
@Slf4j
@AllArgsConstructor
public class ObjectRedisSerializer implements RedisSerializer<Object> {
static final FastThreadLocal<ByteArrayOutputStream> STREAM_LOCAL = new FastThreadLocal<ByteArrayOutputStream>() {
@Override
protected ByteArrayOutputStream initialValue() {
return new ByteArrayOutputStream(1024) {
@Override
public void close() {
reset();
}
};
}
};
@Override
@SneakyThrows
public byte[] serialize(Object o) throws SerializationException {
if (o == null) {
return null;
}
ByteArrayOutputStream arr = STREAM_LOCAL.get();
try (ObjectOutput output = Serializers.getDefault().createOutput(arr)) {
SerializeUtils.writeObject(o, output);
output.flush();
return arr.toByteArray();
} catch (Throwable e) {
log.error(e.getMessage(), e);
throw e;
}
}
@Override
@SneakyThrows
public Object deserialize(byte[] bytes) throws SerializationException {
if (bytes == null) {
return null;
}
try (ObjectInput input = Serializers
.getDefault()
.createInput(new ByteArrayInputStream(bytes))) {
return SerializeUtils.readObject(input);
} catch (Throwable e) {
log.error(e.getMessage(), e);
throw e;
}
}
}

View File

@ -1,11 +1,9 @@
package org.jetlinks.community.standalone.configuration;
package org.jetlinks.community.configure.redis;
import org.jetlinks.community.standalone.configuration.fst.FstSerializationRedisSerializer;
import org.nustaq.serialization.FSTConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ResourceLoader;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
@ -13,23 +11,17 @@ import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
@ConditionalOnProperty(prefix = "spring.redis",name = "serializer",havingValue = "fst")
public class JetLinksRedisConfiguration {
@ConditionalOnProperty(prefix = "spring.redis",name = "serializer",havingValue = "obj",matchIfMissing = true)
public class RedisSerializationConfiguration {
@Bean
public ReactiveRedisTemplate<Object, Object> reactiveRedisTemplate(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory, ResourceLoader resourceLoader) {
FstSerializationRedisSerializer serializer = new FstSerializationRedisSerializer(() -> {
FSTConfiguration configuration = FSTConfiguration.createDefaultConfiguration()
.setForceSerializable(true);
configuration.setClassLoader(resourceLoader.getClassLoader());
return configuration;
});
@Primary
public ReactiveRedisTemplate<Object, Object> reactiveRedisTemplate(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ObjectRedisSerializer serializer = new ObjectRedisSerializer();
@SuppressWarnings("all")
RedisSerializationContext<Object, Object> serializationContext = RedisSerializationContext
.newSerializationContext()
.key((RedisSerializer)new StringRedisSerializer())
.key((RedisSerializer) StringRedisSerializer.UTF_8)
.value(serializer)
.hashKey(StringRedisSerializer.UTF_8)
.hashValue(serializer)

View File

@ -2,4 +2,5 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.jetlinks.community.configure.cluster.ClusterConfiguration,\
org.jetlinks.community.configure.doc.SpringDocCustomizerConfiguration,\
org.jetlinks.community.configure.trace.TraceConfiguration,\
org.jetlinks.community.configure.device.DeviceClusterConfiguration
org.jetlinks.community.configure.device.DeviceClusterConfiguration,\
org.jetlinks.community.configure.redis.RedisSerializationConfiguration

View File

@ -23,6 +23,8 @@
<module>logging-component</module>
<module>rule-engine-component</module>
<module>configure-component</module>
<module>things-component</module>
<module>script-component</module>
</modules>
<artifactId>jetlinks-components</artifactId>

View File

@ -3,22 +3,12 @@ package org.jetlinks.community.standalone.configuration;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.authorization.token.UserTokenManager;
import org.hswebframework.web.authorization.token.redis.RedisUserTokenManager;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.spi.ServiceContext;
import org.jetlinks.supports.protocol.ServiceLoaderProtocolSupports;
import org.jetlinks.supports.protocol.management.ClusterProtocolSupportManager;
import org.jetlinks.supports.protocol.management.ProtocolSupportLoader;
import org.jetlinks.supports.protocol.management.ProtocolSupportManager;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.ReactiveRedisOperations;
@Configuration
@EnableConfigurationProperties(JetLinksProperties.class)
@ -46,35 +36,5 @@ public class JetLinksConfiguration {
return Vertx.vertx(vertxOptions);
}
@Bean(initMethod = "init")
@ConditionalOnProperty(prefix = "jetlinks.protocol.spi", name = "enabled", havingValue = "true")
public ServiceLoaderProtocolSupports serviceLoaderProtocolSupports(ServiceContext serviceContext) {
ServiceLoaderProtocolSupports supports = new ServiceLoaderProtocolSupports();
supports.setServiceContext(serviceContext);
return supports;
}
@Bean
@ConfigurationProperties(prefix = "hsweb.user-token")
public UserTokenManager userTokenManager(ReactiveRedisOperations<Object, Object> template) {
return new RedisUserTokenManager(template);
}
@Bean
public ProtocolSupportManager protocolSupportManager(ClusterManager clusterManager) {
return new ClusterProtocolSupportManager(clusterManager);
}
@Bean
public LazyInitManagementProtocolSupports managementProtocolSupports(ProtocolSupportManager supportManager,
ProtocolSupportLoader loader,
ClusterManager clusterManager) {
LazyInitManagementProtocolSupports supports = new LazyInitManagementProtocolSupports();
supports.setClusterManager(clusterManager);
supports.setManager(supportManager);
supports.setLoader(loader);
return supports;
}
}

View File

@ -1,87 +0,0 @@
package org.jetlinks.community.standalone.configuration;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.supports.protocol.StaticProtocolSupports;
import org.jetlinks.supports.protocol.management.ProtocolSupportDefinition;
import org.jetlinks.supports.protocol.management.ProtocolSupportLoader;
import org.jetlinks.supports.protocol.management.ProtocolSupportManager;
import org.springframework.boot.CommandLineRunner;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@Slf4j
@Getter
@Setter
public class LazyInitManagementProtocolSupports extends StaticProtocolSupports implements CommandLineRunner {
private ProtocolSupportManager manager;
private ProtocolSupportLoader loader;
private ClusterManager clusterManager;
@Setter(AccessLevel.PRIVATE)
private Map<String, String> configProtocolIdMapping = new ConcurrentHashMap<>();
private Duration loadTimeOut = Duration.ofSeconds(30);
public void init() {
clusterManager.<ProtocolSupportDefinition>getTopic("_protocol_changed")
.subscribe()
.subscribe(protocol -> this.init(protocol).subscribe());
try {
manager.loadAll()
.filter(de -> de.getState() == 1)
.flatMap(this::init)
.blockLast(loadTimeOut);
} catch (Throwable e) {
log.error("load protocol error", e);
}
}
public Mono<Void> init(ProtocolSupportDefinition definition) {
if (definition.getState() != 1) {
String protocol = configProtocolIdMapping.get(definition.getId());
if (protocol != null) {
log.debug("uninstall protocol:{}", definition);
unRegister(protocol);
return Mono.empty();
}
}
String operation = definition.getState() != 1 ? "uninstall" : "install";
Consumer<ProtocolSupport> consumer = definition.getState() != 1 ? this::unRegister : this::register;
log.debug("{} protocol:{}", operation, definition);
return loader
.load(definition)
.doOnNext(e -> {
log.debug("{} protocol[{}] success: {}", operation, definition.getId(), e);
configProtocolIdMapping.put(definition.getId(), e.getId());
consumer.accept(e);
})
.onErrorResume((e) -> {
log.error("{} protocol[{}] error: {}", operation, definition.getId(), e);
return Mono.empty();
})
.then();
}
@Override
public void run(String... args) {
init();
}
}

View File

@ -1,208 +0,0 @@
package org.jetlinks.community.standalone.configuration;
import lombok.AllArgsConstructor;
import lombok.Generated;
import lombok.Getter;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.device.*;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.message.codec.DeviceMessageCodec;
import org.jetlinks.core.message.codec.TraceDeviceMessageCodec;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.jetlinks.core.metadata.*;
import org.jetlinks.core.route.Route;
import org.jetlinks.core.server.ClientConnection;
import org.jetlinks.core.server.DeviceGatewayContext;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Map;
/**
* 重命名协议将协议包里的协议使用进行重命名
*
* @author zhouhao
* @since 1.2
*/
@AllArgsConstructor
@Generated
public class RenameProtocolSupport implements ProtocolSupport {
public static final JetLinksDeviceMetadataCodec metadataCodec = new JetLinksDeviceMetadataCodec();
@Getter
private final String id;
@Getter
private final String name;
@Getter
private final String description;
private final ProtocolSupport target;
@Override
public Flux<? extends Transport> getSupportedTransport() {
return target.getSupportedTransport();
}
@Nonnull
@Override
public Mono<? extends DeviceMessageCodec> getMessageCodec(Transport transport) {
return target
.getMessageCodec(transport)
.map(codec-> new TraceDeviceMessageCodec(id,codec));
}
@Override
public Mono<DeviceMessageSenderInterceptor> getSenderInterceptor() {
return target.getSenderInterceptor();
}
@Nonnull
@Override
@SuppressWarnings("all")
public DeviceMetadataCodec getMetadataCodec() {
return target.getMetadataCodec() == null ? metadataCodec : target.getMetadataCodec();
}
@Nonnull
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request,
@Nonnull DeviceOperator deviceOperation) {
return target.authenticate(request, deviceOperation);
}
@Nonnull
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request,
@Nonnull DeviceRegistry registry) {
return target.authenticate(request, registry);
}
@Override
public Mono<DeviceMetadata> getDefaultMetadata(Transport transport) {
return target.getDefaultMetadata(transport);
}
@Override
public Flux<ConfigMetadata> getMetadataExpandsConfig(Transport transport,
DeviceMetadataType metadataType,
String metadataId,
String dataTypeId) {
return target.getMetadataExpandsConfig(transport, metadataType, metadataId, dataTypeId);
}
@Override
public Flux<DeviceMetadataCodec> getMetadataCodecs() {
return target.getMetadataCodecs();
}
@Override
public Mono<ConfigMetadata> getInitConfigMetadata() {
return target.getInitConfigMetadata();
}
@Nonnull
@Override
public Mono<DeviceStateChecker> getStateChecker() {
return target.getStateChecker();
}
@Override
public Mono<ConfigMetadata> getConfigMetadata(Transport transport) {
return target.getConfigMetadata(transport);
}
@Override
public void init(Map<String, Object> configuration) {
target.init(configuration);
}
@Override
public void dispose() {
target.dispose();
}
@Override
public boolean isDisposed() {
return target.isDisposed();
}
@Override
public Mono<Void> onDeviceUnRegister(DeviceOperator operator) {
return target.onDeviceUnRegister(operator);
}
@Override
public Mono<Void> onDeviceRegister(DeviceOperator operator) {
return target.onDeviceRegister(operator);
}
@Override
public Mono<Void> onProductRegister(DeviceProductOperator operator) {
return target.onProductRegister(operator);
}
@Override
public Mono<Void> onProductUnRegister(DeviceProductOperator operator) {
return target.onProductUnRegister(operator);
}
@Override
public Mono<Void> onDeviceMetadataChanged(DeviceOperator operator) {
return target.onDeviceMetadataChanged(operator);
}
@Override
public Mono<Void> onProductMetadataChanged(DeviceProductOperator operator) {
return target.onProductMetadataChanged(operator);
}
@Override
public Mono<Void> onChildBind(DeviceOperator gateway, Flux<DeviceOperator> child) {
return target.onChildBind(gateway, child);
}
@Override
public Mono<Void> onChildUnbind(DeviceOperator gateway, Flux<DeviceOperator> child) {
return target.onChildUnbind(gateway, child);
}
@Override
public Mono<Void> onClientConnect(Transport transport, ClientConnection connection, DeviceGatewayContext context) {
return target.onClientConnect(transport, connection, context);
}
@Override
public Flux<Feature> getFeatures(Transport transport) {
return target.getFeatures(transport);
}
@Override
public Mono<DeviceInfo> doBeforeDeviceCreate(Transport transport, DeviceInfo deviceInfo) {
return target.doBeforeDeviceCreate(transport, deviceInfo);
}
@Override
public int getOrder() {
return target.getOrder();
}
@Override
public int compareTo(ProtocolSupport o) {
return target.compareTo(o);
}
@Override
public Flux<Route> getRoutes(Transport transport) {
return target.getRoutes(transport);
}
@Override
public String getDocument(Transport transport) {
return target.getDocument(transport);
}
}

View File

@ -1,41 +0,0 @@
package org.jetlinks.community.standalone.configuration;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.supports.protocol.management.ProtocolSupportDefinition;
import org.jetlinks.supports.protocol.management.ProtocolSupportLoader;
import org.jetlinks.supports.protocol.management.ProtocolSupportLoaderProvider;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class SpringProtocolSupportLoader implements ProtocolSupportLoader,BeanPostProcessor {
private final Map<String, ProtocolSupportLoaderProvider> providers = new ConcurrentHashMap<>();
public void register(ProtocolSupportLoaderProvider provider) {
this.providers.put(provider.getProvider(), provider);
}
@Override
public Mono<? extends ProtocolSupport> load(ProtocolSupportDefinition definition) {
return Mono
.justOrEmpty(this.providers.get(definition.getProvider()))
.switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("unsupported provider:" + definition.getProvider())))
.flatMap((provider) -> provider.load(definition))
.map(loaded -> new RenameProtocolSupport(definition.getId(), definition.getName(), definition.getDescription(), loaded));
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof ProtocolSupportLoaderProvider) {
register(((ProtocolSupportLoaderProvider) bean));
}
return bean;
}
}

View File

@ -1,25 +0,0 @@
package org.jetlinks.community.standalone.configuration;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.defaults.CompositeProtocolSupports;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
@Component
@Primary
public class SpringProtocolSupports extends CompositeProtocolSupports implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object o, String s) throws BeansException {
if (o == this) {
return o;
}
if (o instanceof ProtocolSupports) {
register(((ProtocolSupports) o));
}
return o;
}
}

View File

@ -1,70 +0,0 @@
package org.jetlinks.community.standalone.configuration;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.Value;
import org.jetlinks.core.config.ConfigKey;
import org.jetlinks.core.spi.ServiceContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@Component
@Slf4j
public class SpringServiceContext implements ServiceContext {
@Autowired
private ApplicationContext applicationContext;
@Override
public Optional<Value> getConfig(ConfigKey<String> key) {
return getConfig(key.getKey());
}
@Override
public Optional<Value> getConfig(String key) {
return Optional.ofNullable(applicationContext.getEnvironment()
.getProperty(key))
.map(Value::simple)
;
}
@Override
public <T> Optional<T> getService(Class<T> service) {
try {
return Optional.of(applicationContext.getBean(service));
} catch (Exception e) {
log.error("load service [{}] error", service, e);
return Optional.empty();
}
}
@Override
public <T> Optional<T> getService(String service) {
try {
return Optional.of((T)applicationContext.getBean(service));
} catch (Exception e) {
log.error("load service [{}] error", service, e);
return Optional.empty();
}
}
@Override
public <T> List<T> getServices(Class<T> service) {
try {
return new ArrayList<>(applicationContext.getBeansOfType(service).values());
}catch (Exception e){
log.error("load service [{}] error", service, e);
return Collections.emptyList();
}
}
@Override
public <T> List<T> getServices(String service) {
return Collections.emptyList();
}
}

View File

@ -1,50 +0,0 @@
package org.jetlinks.community.standalone.configuration.doc;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.enums.SecuritySchemeIn;
import io.swagger.v3.oas.annotations.enums.SecuritySchemeType;
import io.swagger.v3.oas.annotations.info.Contact;
import io.swagger.v3.oas.annotations.info.Info;
import io.swagger.v3.oas.annotations.security.SecurityScheme;
import io.swagger.v3.oas.annotations.security.SecuritySchemes;
import org.hswebframework.web.crud.web.ResponseMessage;
import org.reactivestreams.Publisher;
import org.springdoc.core.ReturnTypeParser;
import org.springdoc.webflux.core.SpringDocWebFluxConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
@Configuration(proxyBeanMethods = false)
@OpenAPIDefinition(
info = @Info(
title = "物联网平台",
description = "物联网平台接口文档",
contact = @Contact(name = "admin",url = "https://github.com/jetlinks"),
version = "1.12.0"
)
)
@SecuritySchemes(
{
@SecurityScheme(
type = SecuritySchemeType.APIKEY,
name = "Token",
paramName = "X-Access-Token",
in = SecuritySchemeIn.HEADER,
description = "认证token"
)
}
)
@AutoConfigureBefore(SpringDocWebFluxConfiguration.class)
public class SwaggerConfiguration {
}

View File

@ -1,48 +0,0 @@
package org.jetlinks.community.standalone.configuration.fst;
import io.netty.util.concurrent.FastThreadLocal;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTObjectInput;
import org.nustaq.serialization.FSTObjectOutput;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import java.io.ByteArrayOutputStream;
import java.util.function.Supplier;
@AllArgsConstructor
public class FstSerializationRedisSerializer implements RedisSerializer<Object> {
private final FastThreadLocal<FSTConfiguration> configuration;
public FstSerializationRedisSerializer(Supplier<FSTConfiguration> supplier) {
this(new FastThreadLocal<FSTConfiguration>() {
@Override
protected FSTConfiguration initialValue() {
return supplier.get();
}
});
}
@Override
@SneakyThrows
public byte[] serialize(Object o) throws SerializationException {
ByteArrayOutputStream arr = new ByteArrayOutputStream(1024);
try (FSTObjectOutput output = configuration.get().getObjectOutput(arr)) {
output.writeObject(o);
}
return arr.toByteArray();
}
@Override
@SneakyThrows
public Object deserialize(byte[] bytes) throws SerializationException {
try (FSTObjectInput input = configuration.get().getObjectInput(bytes)) {
return input.readObject();
}
}
}

View File

@ -1,163 +0,0 @@
package org.jetlinks.community.standalone.configuration.protocol;
import lombok.Generated;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.spi.ServiceContext;
import org.jetlinks.community.io.file.FileManager;
import org.jetlinks.community.utils.TimeUtils;
import org.jetlinks.supports.protocol.management.ProtocolSupportDefinition;
import org.jetlinks.supports.protocol.management.jar.JarProtocolSupportLoader;
import org.jetlinks.supports.protocol.management.jar.ProtocolClassLoader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import javax.annotation.PreDestroy;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import static java.nio.file.StandardOpenOption.*;
/**
* 自动下载并缓存协议包
* <pre>
* 1. 下载的协议包报错在./data/protocols目录下可通过启动参数-Djetlinks.protocol.temp.path进行配置
* 2. 文件名规则: 协议ID+"_"+md5(文件地址)
* 3. 如果文件不存在则下载协议
* </pre>
*
* @author zhouhao
* @since 1.3
*/
@Component
@Slf4j
public class AutoDownloadJarProtocolSupportLoader extends JarProtocolSupportLoader {
final WebClient webClient;
final File tempPath;
private final Duration loadTimeout = TimeUtils.parse(System.getProperty("jetlinks.protocol.load.timeout", "30s"));
private final FileManager fileManager;
public AutoDownloadJarProtocolSupportLoader(WebClient.Builder builder,
FileManager fileManager) {
this.webClient = builder.build();
this.fileManager = fileManager;
tempPath = new File(System.getProperty("jetlinks.protocol.temp.path", "./data/protocols"));
tempPath.mkdirs();
}
@Override
@Autowired
@Generated
public void setServiceContext(ServiceContext serviceContext) {
super.setServiceContext(serviceContext);
}
@Override
@PreDestroy
@Generated
protected void closeAll() {
super.closeAll();
}
@Override
protected void closeLoader(ProtocolClassLoader loader) {
super.closeLoader(loader);
}
@Override
public Mono<? extends ProtocolSupport> load(ProtocolSupportDefinition definition) {
//复制新的配置信息
ProtocolSupportDefinition newDef = FastBeanCopier.copy(definition, new ProtocolSupportDefinition());
Map<String, Object> config = newDef.getConfiguration();
String location = Optional
.ofNullable(config.get("location"))
.map(String::valueOf)
.orElse(null);
//远程文件则先下载再加载
if (StringUtils.hasText(location) && location.startsWith("http")) {
String urlMd5 = DigestUtils.md5Hex(location);
//地址没变则直接加载本地文件
File file = new File(tempPath, (newDef.getId() + "_" + urlMd5) + ".jar");
if (file.exists()) {
//设置文件地址文本地文件
config.put("location", file.getAbsolutePath());
return super
.load(newDef)
.subscribeOn(Schedulers.boundedElastic())
//加载失败则删除文件,防止文件内容错误时,一直无法加载
.doOnError(err -> file.delete());
}
return webClient
.get()
.uri(location)
.retrieve()
.bodyToFlux(DataBuffer.class)
.as(dataStream -> {
log.debug("download protocol file {} to {}", location, file.getAbsolutePath());
//写出文件
return DataBufferUtils
.write(dataStream, file.toPath(), CREATE, WRITE)
.thenReturn(file.getAbsolutePath());
})
//使用弹性线程池来写出文件
.subscribeOn(Schedulers.boundedElastic())
//设置本地文件路径
.doOnNext(path -> config.put("location", path))
.then(super.load(newDef))
.timeout(loadTimeout, Mono.error(() -> new TimeoutException("获取协议文件失败:" + location)))
//失败时删除文件
.doOnError(err -> file.delete())
;
}
//使用文件管理器获取文件
String fileId = (String) config.getOrDefault("fileId", null);
if (!StringUtils.hasText(fileId)) {
return Mono.error(new IllegalArgumentException("location or fileId can not be empty"));
}
return loadFromFileManager(newDef.getId(), fileId)
.flatMap(file -> {
config.put("location", file.getAbsolutePath());
return super
.load(newDef)
.subscribeOn(Schedulers.boundedElastic())
//加载失败则删除文件,防止文件内容错误时,一直无法加载
.doOnError(err -> file.delete());
});
}
private Mono<File> loadFromFileManager(String protocolId, String fileId) {
Path path = Paths.get(tempPath.getPath(), (protocolId + "_" + fileId) + ".jar");
File file = path.toFile();
if (file.exists()) {
return Mono.just(file);
}
return DataBufferUtils
.write(fileManager.read(fileId),
path, CREATE_NEW, TRUNCATE_EXISTING, WRITE)
.thenReturn(file);
}
}

16
pom.xml
View File

@ -16,7 +16,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.locales>zh_CN</project.build.locales>
<spring.boot.version>2.5.13</spring.boot.version>
<spring.boot.version>2.7.1</spring.boot.version>
<java.version>1.8</java.version>
<project.build.jdk>${java.version}</project.build.jdk>
<hsweb.framework.version>4.0.15-SNAPSHOT</hsweb.framework.version>
@ -24,14 +24,14 @@
<hsweb.expands.version>3.0.2</hsweb.expands.version>
<jetlinks.version>1.2.0-SNAPSHOT</jetlinks.version>
<r2dbc.version>Borca-SR1</r2dbc.version>
<netty.version>4.1.77.Final</netty.version>
<elasticsearch.version>7.11.2</elasticsearch.version>
<reactor.excel.version>1.0.3</reactor.excel.version>
<reactor.ql.version>1.0.14</reactor.ql.version>
<californium.version>3.3.1</californium.version>
<netty.version>4.1.79.Final</netty.version>
<elasticsearch.version>7.17.5</elasticsearch.version>
<reactor.excel.version>1.0.3-SNAPSHOT</reactor.excel.version>
<reactor.ql.version>1.0.14-SNAPSHOT</reactor.ql.version>
<californium.version>3.6.0</californium.version>
<fastjson.version>1.2.83</fastjson.version>
<reactor.version>2020.0.18</reactor.version>
<vertx.version>4.3.0</vertx.version>
<reactor.version>2020.0.20</reactor.version>
<vertx.version>4.3.1</vertx.version>
<log4j.version>2.17.1</log4j.version>
<logback.version>1.2.9</logback.version>
<springdoc.version>1.6.6</springdoc.version>