From a219e6414e163dfb25c7c6c480ade580ed394ed3 Mon Sep 17 00:00:00 2001 From: liujq Date: Thu, 10 Nov 2022 16:25:14 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81local=E5=8D=8F=E8=AE=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../local/LocalFileProtocolSupport.java | 337 ++++++++++++++++++ .../local/LocalProtocolSupportLoader.java | 51 +++ 2 files changed, 388 insertions(+) create mode 100644 jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/local/LocalFileProtocolSupport.java create mode 100644 jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/local/LocalProtocolSupportLoader.java diff --git a/jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/local/LocalFileProtocolSupport.java b/jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/local/LocalFileProtocolSupport.java new file mode 100644 index 00000000..df78fd8e --- /dev/null +++ b/jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/local/LocalFileProtocolSupport.java @@ -0,0 +1,337 @@ +package org.jetlinks.community.protocol.local; + +import com.sun.nio.file.SensitivityWatchEventModifier; +import lombok.Generated; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.jetlinks.core.ProtocolSupport; +import org.jetlinks.core.device.*; +import org.jetlinks.core.message.codec.DeviceMessageCodec; +import org.jetlinks.core.message.codec.Transport; +import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor; +import org.jetlinks.core.metadata.*; +import org.jetlinks.core.route.Route; +import org.jetlinks.core.server.ClientConnection; +import org.jetlinks.core.server.DeviceGatewayContext; +import org.jetlinks.core.spi.ProtocolSupportProvider; +import org.jetlinks.core.spi.ServiceContext; +import org.jetlinks.supports.protocol.management.jar.ProtocolClassLoader; +import org.springframework.util.ClassUtils; +import org.springframework.util.StringUtils; +import reactor.core.Disposable; +import reactor.core.Disposables; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import javax.annotation.Nonnull; +import java.io.File; +import java.net.URL; +import java.nio.file.*; +import java.time.Duration; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +@Slf4j +@Generated +public class LocalFileProtocolSupport implements ProtocolSupport { + + private volatile ProtocolSupport loaded; + + private final Composite disposable = Disposables.composite(); + + + @SneakyThrows + protected void closeClassLoader(ProtocolClassLoader loader) { + if (null != loader) { + loader.close(); + } + } + + @SneakyThrows + public void init(File file, ServiceContext context, String providerName) { + String path = file.isDirectory() ? file.getAbsolutePath() : file.getParentFile().getAbsolutePath(); + + WatchService watchService = FileSystems.getDefault().newWatchService(); + + Consumer doWatch = watch -> { + try { + WatchKey key = watch.register(watchService, new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.ENTRY_DELETE}, + SensitivityWatchEventModifier.HIGH); + + disposable.add(key::cancel); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + Files.walk(Paths.get(path), FileVisitOption.FOLLOW_LINKS) + .filter(p -> p.toFile().isDirectory()) + .forEach(doWatch); + + + AtomicReference ref = new AtomicReference<>(); + + disposable.add(new Disposable() { + @Override + @SneakyThrows + @Generated + public void dispose() { + watchService.close(); + closeClassLoader(ref.get()); + } + }); + + URL[] urls = new URL[]{file.toURI().toURL()}; + Callable init = () -> { + log.debug("{}load local protocol :{}", ref.get() == null ? "" : "re", file); + ProtocolClassLoader loader = new ProtocolClassLoader( + urls, ClassUtils.getDefaultClassLoader() + ); + + ProtocolSupportProvider supportProvider; + + if (StringUtils.hasText(providerName)) { + supportProvider = (ProtocolSupportProvider) Class + .forName(providerName, true, loader) + .newInstance(); + } else { + supportProvider = org.jetlinks.core.utils.ClassUtils + .findImplClass(ProtocolSupportProvider.class, + "**/*.class", + loader, + ProtocolClassLoader::loadSelfClass) + .orElseThrow(() -> new IllegalArgumentException("ProtocolSupportProvider not found")); + } + disposable.add(supportProvider); + + supportProvider.create(context) + .subscribe( + protocol -> { + log.debug("{}load local protocol :{}", ref.get() == null ? "" : "re", protocol); + closeClassLoader(ref.get()); + ref.set(loader); + if (loaded != null) { + loaded.dispose(); + } + loaded = protocol; + }, error -> { + log.error("init local protocol error", error); + supportProvider.dispose(); + closeClassLoader(loader); + }); + return null; + }; + init.call(); + + disposable + .add(Flux + .create(sink -> { + while (!isDisposed()) { + try { + WatchKey watchKey = watchService.take(); + if (watchKey == null) { + continue; + } + watchKey.pollEvents(); + sink.next(watchKey); + watchKey.reset(); + } catch (InterruptedException | ClosedWatchServiceException e) { + break; + } catch (Exception e) { + log.error("init local protocol error", e); + } + } + }) + .elapsed() + .window(Duration.ofSeconds(2)) + .flatMap(window -> window.takeLast(1)) + .delayElements(Duration.ofSeconds(1)) + .doOnNext(tp2 -> { + try { + init.call(); + } catch (Exception e) { + log.error("init local protocol error", e); + } + }) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe() + ); + } + + @Nonnull + @Override + public String getId() { + return loaded.getId(); + } + + @Override + public String getName() { + return loaded.getName(); + } + + @Override + public String getDescription() { + return loaded.getDescription(); + } + + @Override + public Flux getSupportedTransport() { + return loaded.getSupportedTransport(); + } + + @Nonnull + @Override + public Mono getMessageCodec(Transport transport) { + return loaded.getMessageCodec(transport); + } + + @Nonnull + @Override + public DeviceMetadataCodec getMetadataCodec() { + return loaded.getMetadataCodec(); + } + + @Nonnull + @Override + public Mono authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator deviceOperation) { + return loaded.authenticate(request, deviceOperation); + } + + @Nonnull + @Override + public Mono authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceRegistry registry) { + return loaded.authenticate(request, registry); + } + + @Override + public Flux getMetadataCodecs() { + return loaded.getMetadataCodecs(); + } + + @Override + public Mono getConfigMetadata(Transport transport) { + return loaded.getConfigMetadata(transport); + } + + @Override + public Mono getInitConfigMetadata() { + return loaded.getInitConfigMetadata(); + } + + @Override + public Mono getSenderInterceptor() { + return loaded.getSenderInterceptor(); + } + + @Nonnull + @Override + public Mono getStateChecker() { + return loaded.getStateChecker(); + } + + @Override + public Flux getMetadataExpandsConfig(Transport transport, DeviceMetadataType metadataType, String metadataId, String dataTypeId) { + return loaded.getMetadataExpandsConfig(transport, metadataType, metadataId, dataTypeId); + } + + @Override + public Mono onProductRegister(DeviceProductOperator operator) { + return loaded.onProductRegister(operator); + } + + @Override + public Mono onDeviceRegister(DeviceOperator operator) { + return loaded.onDeviceRegister(operator); + } + + @Override + public Mono onDeviceUnRegister(DeviceOperator operator) { + return loaded.onDeviceUnRegister(operator); + } + + @Override + public Mono onProductUnRegister(DeviceProductOperator operator) { + return loaded.onProductUnRegister(operator); + } + + @Override + public Mono getDefaultMetadata(Transport transport) { + return loaded.getDefaultMetadata(transport); + } + + @Override + public Mono onDeviceMetadataChanged(DeviceOperator operator) { + return loaded.onDeviceMetadataChanged(operator); + } + + @Override + public Mono onProductMetadataChanged(DeviceProductOperator operator) { + return loaded.onProductMetadataChanged(operator); + } + + @Override + public void dispose() { + if (loaded != null) { + loaded.dispose(); + } + disposable.dispose(); + } + + @Override + public boolean isDisposed() { + return disposable.isDisposed(); + } + + @Override + public Mono onChildBind(DeviceOperator gateway, Flux child) { + return loaded.onChildBind(gateway, child); + } + + @Override + public Mono onChildUnbind(DeviceOperator gateway, Flux child) { + return loaded.onChildUnbind(gateway, child); + } + + @Override + public Mono onClientConnect(Transport transport, ClientConnection connection, DeviceGatewayContext context) { + return loaded.onClientConnect(transport, connection, context); + } + + @Override + public Flux getFeatures(Transport transport) { + return loaded.getFeatures(transport); + } + + @Override + public Mono doBeforeDeviceCreate(Transport transport, DeviceInfo deviceInfo) { + return loaded.doBeforeDeviceCreate(transport, deviceInfo); + } + + @Override + public int getOrder() { + return loaded.getOrder(); + } + + @Override + public int compareTo(ProtocolSupport o) { + return loaded.compareTo(o); + } + + @Override + public Flux getRoutes(Transport transport) { + return loaded.getRoutes(transport); + } + + @Override + public String getDocument(Transport transport) { + return loaded.getDocument(transport); + } + + @Override + public boolean isEmbedded() { + return loaded.isEmbedded(); + } +} diff --git a/jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/local/LocalProtocolSupportLoader.java b/jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/local/LocalProtocolSupportLoader.java new file mode 100644 index 00000000..758f56f6 --- /dev/null +++ b/jetlinks-components/protocol-component/src/main/java/org/jetlinks/community/protocol/local/LocalProtocolSupportLoader.java @@ -0,0 +1,51 @@ +package org.jetlinks.community.protocol.local; + +import lombok.AllArgsConstructor; +import lombok.Generated; +import org.jetlinks.core.spi.ServiceContext; +import org.jetlinks.community.ValueObject; +import org.jetlinks.supports.protocol.management.ProtocolSupportDefinition; +import org.jetlinks.supports.protocol.management.ProtocolSupportLoaderProvider; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +import java.io.File; +import java.io.FileNotFoundException; + +@AllArgsConstructor +@Generated +public class LocalProtocolSupportLoader implements ProtocolSupportLoaderProvider { + + private final ServiceContext serviceContext; + + @Override + public String getProvider() { + return "local"; + } + + @Override + public Mono load(ProtocolSupportDefinition definition) { + + return Mono + .fromCallable(() -> { + ValueObject config = ValueObject.of(definition.getConfiguration()); + + String location = config + .getString("location") + .orElseThrow(() -> new IllegalArgumentException("location cannot be null")); + String provider = config.get("provider") + .map(String::valueOf) + .map(String::trim) + .orElse(null); + File file = new File(location); + if (!file.exists()) { + throw new FileNotFoundException("文件" + file.getName() + "不存在"); + } + + LocalFileProtocolSupport support = new LocalFileProtocolSupport(); + support.init(file, serviceContext, provider); + return support; + }) + .subscribeOn(Schedulers.boundedElastic()); + } +}